Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-01-09 11:49:04 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-01-09 11:49:04 +0300
commit22783620ad85d2aa4b5e6f628231d9bd8ca341d8 (patch)
tree3e75d2e819249ebce15c260858779ebf38c17ae7
parentfaa85e577ef9dd8063e7a1096d04e68dd22da3fc (diff)
parent260cb9c9839bcc18c2f6325aa0cc9ef1bbd4eeee (diff)
Merge branch 'pks-limithandler-refactorings' into 'master'
limithandler: Refactor concurrency limit handler See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5239 Merged-by: Patrick Steinhardt <psteinhardt@gitlab.com> Approved-by: Steve Azzopardi <sazzopardi@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go228
-rw-r--r--internal/middleware/limithandler/middleware_test.go15
2 files changed, 142 insertions, 101 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index adda2a935..0267dc3c1 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -26,33 +26,22 @@ var ErrMaxQueueSize = errors.New("maximum queue size reached")
// QueueTickerCreator is a function that provides a ticker
type QueueTickerCreator func() helper.Ticker
-// ConcurrencyLimiter contains rate limiter state
-type ConcurrencyLimiter struct {
- semaphores map[string]*semaphoreReference
- // maxPerKey is the maximum number of concurrent operations
- // per lockKey
- maxPerKey int64
- // queued tracks the current number of operations waiting to be picked up
- queued int64
- // queuedLimit is the maximum number of operations allowed to wait in a queued state.
- // subsequent incoming operations will fail with an error.
- queuedLimit int64
- monitor ConcurrencyMonitor
- mux sync.RWMutex
- maxWaitTickerGetter QueueTickerCreator
-}
-
-type semaphoreReference struct {
- tokens chan struct{}
- count int
- newTicker QueueTickerCreator
+// keyedConcurrencyLimiter is a concurrency limiter that applies to a specific keyed resource.
+type keyedConcurrencyLimiter struct {
+ // tokens is the channel of available tokens, where every token allows one concurrent call
+ // to the concurrency-limited function.
+ tokens chan struct{}
+ refcount int
+ maxQueuedTickerCreator QueueTickerCreator
}
-func (sem *semaphoreReference) acquire(ctx context.Context) error {
+// acquire tries to acquire the semaphore. It may fail if there is a max queue-time ticker that
+// ticks before acquiring a token.
+func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error {
var ticker helper.Ticker
- if sem.newTicker != nil {
- ticker = sem.newTicker()
+ if sem.maxQueuedTickerCreator != nil {
+ ticker = sem.maxQueuedTickerCreator()
} else {
ticker = helper.Ticker(helper.NewManualTicker())
}
@@ -70,84 +59,67 @@ func (sem *semaphoreReference) acquire(ctx context.Context) error {
}
}
-func (sem *semaphoreReference) release() { <-sem.tokens }
-
-// Lazy create a semaphore for the given key
-func (c *ConcurrencyLimiter) getSemaphore(lockKey string) *semaphoreReference {
- c.mux.Lock()
- defer c.mux.Unlock()
-
- if c.semaphores[lockKey] == nil {
- c.semaphores[lockKey] = &semaphoreReference{
- tokens: make(chan struct{}, c.maxPerKey),
- newTicker: c.maxWaitTickerGetter,
- }
- }
-
- c.semaphores[lockKey].count++
- return c.semaphores[lockKey]
-}
-
-func (c *ConcurrencyLimiter) putSemaphore(lockKey string) {
- c.mux.Lock()
- defer c.mux.Unlock()
-
- ref := c.semaphores[lockKey]
- if ref == nil {
- panic("semaphore should be in the map")
- }
-
- if ref.count <= 0 {
- panic(fmt.Sprintf("bad semaphore ref count %d", ref.count))
- }
-
- ref.count--
- if ref.count == 0 {
- delete(c.semaphores, lockKey)
- }
-}
-
-func (c *ConcurrencyLimiter) countSemaphores() int {
- c.mux.RLock()
- defer c.mux.RUnlock()
+// release releases the semaphore.
+func (sem *keyedConcurrencyLimiter) release() { <-sem.tokens }
- return len(c.semaphores)
+// ConcurrencyLimiter contains rate limiter state
+type ConcurrencyLimiter struct {
+ // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
+ // This limit is per key.
+ maxConcurrencyLimit int64
+ // maxQueueLength is the maximum number of operations allowed to wait in a queued state.
+ // This limit is global and applies before the concurrency limit. Subsequent incoming
+ // operations will be rejected with an error immediately.
+ maxQueueLength int64
+ // maxQueuedTickerCreator is a function that creates a ticker used to determine how long a
+ // call may be queued.
+ maxQueuedTickerCreator QueueTickerCreator
+
+ // monitor is a monitor that will get notified of the state of concurrency-limited RPC
+ // calls.
+ monitor ConcurrencyMonitor
+
+ m sync.RWMutex
+ // queued tracks the current number of operations waiting in the admission queue.
+ queued int64
+ // limitsByKey tracks all concurrency limits per key. Its per-key entries are lazily created
+ // and will get evicted once there are no concurrency-limited calls for any such key
+ // anymore.
+ limitsByKey map[string]*keyedConcurrencyLimiter
}
-func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error {
- c.mux.Lock()
- defer c.mux.Unlock()
-
- if c.queuedLimit > 0 &&
- c.queued >= c.queuedLimit {
- c.monitor.Dropped(ctx, "max_size")
- return ErrMaxQueueSize
+// NewConcurrencyLimiter creates a new concurrency rate limiter.
+func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+ if monitor == nil {
+ monitor = NewNoopConcurrencyMonitor()
}
- c.queued++
- return nil
-}
-
-func (c *ConcurrencyLimiter) queueDec(decremented *bool) {
- if decremented == nil || *decremented {
- return
+ return &ConcurrencyLimiter{
+ maxConcurrencyLimit: int64(maxConcurrencyLimit),
+ maxQueueLength: int64(maxQueueLength),
+ maxQueuedTickerCreator: maxQueuedTickerCreator,
+ monitor: monitor,
+ limitsByKey: make(map[string]*keyedConcurrencyLimiter),
}
- *decremented = true
- c.mux.Lock()
- defer c.mux.Unlock()
-
- c.queued--
}
-// Limit will limit the concurrency of f
-func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) {
- if c.maxPerKey <= 0 {
+// Limit will limit the concurrency of the limited function f. There are two distinct mechanisms
+// that limit execution of the function:
+//
+// 1. First, every call will enter the queue. This queue is global across all keys and limits how
+// many callers may try to acquire their per-key semaphore at the same time. If the queue is full
+// the caller will be rejected.
+// 2. Second, when the caller has successfully entered the queue, they try to acquire their per-key
+// semaphore. If this takes longer than the maximum queueing limit then the caller will be
+// dequeued and gets an error.
+func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error) {
+ if c.maxConcurrencyLimit <= 0 {
return f()
}
var decremented bool
- log := ctxlogrus.Extract(ctx).WithField("limiting_key", lockKey)
+ log := ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey)
if err := c.queueInc(ctx); err != nil {
if errors.Is(err, ErrMaxQueueSize) {
return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(
@@ -166,8 +138,8 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
start := time.Now()
c.monitor.Queued(ctx)
- sem := c.getSemaphore(lockKey)
- defer c.putSemaphore(lockKey)
+ sem := c.getConcurrencyLimit(limitingKey)
+ defer c.putConcurrencyLimit(limitingKey)
err := sem.acquire(ctx)
c.queueDec(&decremented)
@@ -194,19 +166,75 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
return f()
}
-// NewConcurrencyLimiter creates a new concurrency rate limiter
-func NewConcurrencyLimiter(perKeyLimit, globalLimit int, maxWaitTickerGetter QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
- if monitor == nil {
- monitor = NewNoopConcurrencyMonitor()
+// getConcurrencyLimit retrieves the concurrency limit for the given key. If no such limiter exists
+// it will be lazily constructed.
+func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcurrencyLimiter {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ if c.limitsByKey[limitingKey] == nil {
+ c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{
+ tokens: make(chan struct{}, c.maxConcurrencyLimit),
+ maxQueuedTickerCreator: c.maxQueuedTickerCreator,
+ }
}
- return &ConcurrencyLimiter{
- semaphores: make(map[string]*semaphoreReference),
- maxPerKey: int64(perKeyLimit),
- queuedLimit: int64(globalLimit),
- monitor: monitor,
- maxWaitTickerGetter: maxWaitTickerGetter,
+ c.limitsByKey[limitingKey].refcount++
+ return c.limitsByKey[limitingKey]
+}
+
+// putConcurrencyLimit drops the reference to the concurrency limit identified by the given key.
+// This must only ever be called after `getConcurrencyLimit()` for the same key. If the reference
+// count of the concurrency limit drops to zero then it will be destroyed.
+func (c *ConcurrencyLimiter) putConcurrencyLimit(limitingKey string) {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ ref := c.limitsByKey[limitingKey]
+ if ref == nil {
+ panic("semaphore should be in the map")
+ }
+
+ if ref.refcount <= 0 {
+ panic(fmt.Sprintf("bad semaphore ref refcount %d", ref.refcount))
+ }
+
+ ref.refcount--
+ if ref.refcount == 0 {
+ delete(c.limitsByKey, limitingKey)
+ }
+}
+
+func (c *ConcurrencyLimiter) countSemaphores() int {
+ c.m.RLock()
+ defer c.m.RUnlock()
+
+ return len(c.limitsByKey)
+}
+
+func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ if c.maxQueueLength > 0 &&
+ c.queued >= c.maxQueueLength {
+ c.monitor.Dropped(ctx, "max_size")
+ return ErrMaxQueueSize
+ }
+
+ c.queued++
+ return nil
+}
+
+func (c *ConcurrencyLimiter) queueDec(decremented *bool) {
+ if decremented == nil || *decremented {
+ return
}
+ *decremented = true
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ c.queued--
}
// WithConcurrencyLimiters sets up middleware to limit the concurrency of
diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go
index c78cfa9b2..3fc545c76 100644
--- a/internal/middleware/limithandler/middleware_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -201,7 +201,20 @@ func TestStreamLimitHandler(t *testing.T) {
// id, but subsequent requests in a stream, even with the same
// id, should bypass the concurrency limiter
for i := 0; i < 10; i++ {
- require.NoError(t, stream.Send(&grpc_testing.StreamingOutputCallRequest{}))
+ // Rate-limiting the stream is happening asynchronously when
+ // the server-side receives the first message. When the rate
+ // limiter then decides that the RPC call must be limited,
+ // it will close the stream.
+ //
+ // It may thus happen that we already see an EOF here in
+ // case the closed stream is received on the client-side
+ // before we have sent all requests. We thus need to special
+ // case this specific error code and will just stop sending
+ // requests in that case.
+ if err := stream.Send(&grpc_testing.StreamingOutputCallRequest{}); err != nil {
+ require.Equal(t, io.EOF, err)
+ break
+ }
}
require.NoError(t, stream.CloseSend())