diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-01-09 11:49:04 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-01-09 11:49:04 +0300 |
commit | 22783620ad85d2aa4b5e6f628231d9bd8ca341d8 (patch) | |
tree | 3e75d2e819249ebce15c260858779ebf38c17ae7 | |
parent | faa85e577ef9dd8063e7a1096d04e68dd22da3fc (diff) | |
parent | 260cb9c9839bcc18c2f6325aa0cc9ef1bbd4eeee (diff) |
Merge branch 'pks-limithandler-refactorings' into 'master'
limithandler: Refactor concurrency limit handler
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5239
Merged-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Approved-by: Steve Azzopardi <sazzopardi@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 228 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware_test.go | 15 |
2 files changed, 142 insertions, 101 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index adda2a935..0267dc3c1 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -26,33 +26,22 @@ var ErrMaxQueueSize = errors.New("maximum queue size reached") // QueueTickerCreator is a function that provides a ticker type QueueTickerCreator func() helper.Ticker -// ConcurrencyLimiter contains rate limiter state -type ConcurrencyLimiter struct { - semaphores map[string]*semaphoreReference - // maxPerKey is the maximum number of concurrent operations - // per lockKey - maxPerKey int64 - // queued tracks the current number of operations waiting to be picked up - queued int64 - // queuedLimit is the maximum number of operations allowed to wait in a queued state. - // subsequent incoming operations will fail with an error. - queuedLimit int64 - monitor ConcurrencyMonitor - mux sync.RWMutex - maxWaitTickerGetter QueueTickerCreator -} - -type semaphoreReference struct { - tokens chan struct{} - count int - newTicker QueueTickerCreator +// keyedConcurrencyLimiter is a concurrency limiter that applies to a specific keyed resource. +type keyedConcurrencyLimiter struct { + // tokens is the channel of available tokens, where every token allows one concurrent call + // to the concurrency-limited function. + tokens chan struct{} + refcount int + maxQueuedTickerCreator QueueTickerCreator } -func (sem *semaphoreReference) acquire(ctx context.Context) error { +// acquire tries to acquire the semaphore. It may fail if there is a max queue-time ticker that +// ticks before acquiring a token. +func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error { var ticker helper.Ticker - if sem.newTicker != nil { - ticker = sem.newTicker() + if sem.maxQueuedTickerCreator != nil { + ticker = sem.maxQueuedTickerCreator() } else { ticker = helper.Ticker(helper.NewManualTicker()) } @@ -70,84 +59,67 @@ func (sem *semaphoreReference) acquire(ctx context.Context) error { } } -func (sem *semaphoreReference) release() { <-sem.tokens } - -// Lazy create a semaphore for the given key -func (c *ConcurrencyLimiter) getSemaphore(lockKey string) *semaphoreReference { - c.mux.Lock() - defer c.mux.Unlock() - - if c.semaphores[lockKey] == nil { - c.semaphores[lockKey] = &semaphoreReference{ - tokens: make(chan struct{}, c.maxPerKey), - newTicker: c.maxWaitTickerGetter, - } - } - - c.semaphores[lockKey].count++ - return c.semaphores[lockKey] -} - -func (c *ConcurrencyLimiter) putSemaphore(lockKey string) { - c.mux.Lock() - defer c.mux.Unlock() - - ref := c.semaphores[lockKey] - if ref == nil { - panic("semaphore should be in the map") - } - - if ref.count <= 0 { - panic(fmt.Sprintf("bad semaphore ref count %d", ref.count)) - } - - ref.count-- - if ref.count == 0 { - delete(c.semaphores, lockKey) - } -} - -func (c *ConcurrencyLimiter) countSemaphores() int { - c.mux.RLock() - defer c.mux.RUnlock() +// release releases the semaphore. +func (sem *keyedConcurrencyLimiter) release() { <-sem.tokens } - return len(c.semaphores) +// ConcurrencyLimiter contains rate limiter state +type ConcurrencyLimiter struct { + // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function. + // This limit is per key. + maxConcurrencyLimit int64 + // maxQueueLength is the maximum number of operations allowed to wait in a queued state. + // This limit is global and applies before the concurrency limit. Subsequent incoming + // operations will be rejected with an error immediately. + maxQueueLength int64 + // maxQueuedTickerCreator is a function that creates a ticker used to determine how long a + // call may be queued. + maxQueuedTickerCreator QueueTickerCreator + + // monitor is a monitor that will get notified of the state of concurrency-limited RPC + // calls. + monitor ConcurrencyMonitor + + m sync.RWMutex + // queued tracks the current number of operations waiting in the admission queue. + queued int64 + // limitsByKey tracks all concurrency limits per key. Its per-key entries are lazily created + // and will get evicted once there are no concurrency-limited calls for any such key + // anymore. + limitsByKey map[string]*keyedConcurrencyLimiter } -func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error { - c.mux.Lock() - defer c.mux.Unlock() - - if c.queuedLimit > 0 && - c.queued >= c.queuedLimit { - c.monitor.Dropped(ctx, "max_size") - return ErrMaxQueueSize +// NewConcurrencyLimiter creates a new concurrency rate limiter. +func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { + if monitor == nil { + monitor = NewNoopConcurrencyMonitor() } - c.queued++ - return nil -} - -func (c *ConcurrencyLimiter) queueDec(decremented *bool) { - if decremented == nil || *decremented { - return + return &ConcurrencyLimiter{ + maxConcurrencyLimit: int64(maxConcurrencyLimit), + maxQueueLength: int64(maxQueueLength), + maxQueuedTickerCreator: maxQueuedTickerCreator, + monitor: monitor, + limitsByKey: make(map[string]*keyedConcurrencyLimiter), } - *decremented = true - c.mux.Lock() - defer c.mux.Unlock() - - c.queued-- } -// Limit will limit the concurrency of f -func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) { - if c.maxPerKey <= 0 { +// Limit will limit the concurrency of the limited function f. There are two distinct mechanisms +// that limit execution of the function: +// +// 1. First, every call will enter the queue. This queue is global across all keys and limits how +// many callers may try to acquire their per-key semaphore at the same time. If the queue is full +// the caller will be rejected. +// 2. Second, when the caller has successfully entered the queue, they try to acquire their per-key +// semaphore. If this takes longer than the maximum queueing limit then the caller will be +// dequeued and gets an error. +func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error) { + if c.maxConcurrencyLimit <= 0 { return f() } var decremented bool - log := ctxlogrus.Extract(ctx).WithField("limiting_key", lockKey) + log := ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey) if err := c.queueInc(ctx); err != nil { if errors.Is(err, ErrMaxQueueSize) { return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail( @@ -166,8 +138,8 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite start := time.Now() c.monitor.Queued(ctx) - sem := c.getSemaphore(lockKey) - defer c.putSemaphore(lockKey) + sem := c.getConcurrencyLimit(limitingKey) + defer c.putConcurrencyLimit(limitingKey) err := sem.acquire(ctx) c.queueDec(&decremented) @@ -194,19 +166,75 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite return f() } -// NewConcurrencyLimiter creates a new concurrency rate limiter -func NewConcurrencyLimiter(perKeyLimit, globalLimit int, maxWaitTickerGetter QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { - if monitor == nil { - monitor = NewNoopConcurrencyMonitor() +// getConcurrencyLimit retrieves the concurrency limit for the given key. If no such limiter exists +// it will be lazily constructed. +func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcurrencyLimiter { + c.m.Lock() + defer c.m.Unlock() + + if c.limitsByKey[limitingKey] == nil { + c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{ + tokens: make(chan struct{}, c.maxConcurrencyLimit), + maxQueuedTickerCreator: c.maxQueuedTickerCreator, + } } - return &ConcurrencyLimiter{ - semaphores: make(map[string]*semaphoreReference), - maxPerKey: int64(perKeyLimit), - queuedLimit: int64(globalLimit), - monitor: monitor, - maxWaitTickerGetter: maxWaitTickerGetter, + c.limitsByKey[limitingKey].refcount++ + return c.limitsByKey[limitingKey] +} + +// putConcurrencyLimit drops the reference to the concurrency limit identified by the given key. +// This must only ever be called after `getConcurrencyLimit()` for the same key. If the reference +// count of the concurrency limit drops to zero then it will be destroyed. +func (c *ConcurrencyLimiter) putConcurrencyLimit(limitingKey string) { + c.m.Lock() + defer c.m.Unlock() + + ref := c.limitsByKey[limitingKey] + if ref == nil { + panic("semaphore should be in the map") + } + + if ref.refcount <= 0 { + panic(fmt.Sprintf("bad semaphore ref refcount %d", ref.refcount)) + } + + ref.refcount-- + if ref.refcount == 0 { + delete(c.limitsByKey, limitingKey) + } +} + +func (c *ConcurrencyLimiter) countSemaphores() int { + c.m.RLock() + defer c.m.RUnlock() + + return len(c.limitsByKey) +} + +func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error { + c.m.Lock() + defer c.m.Unlock() + + if c.maxQueueLength > 0 && + c.queued >= c.maxQueueLength { + c.monitor.Dropped(ctx, "max_size") + return ErrMaxQueueSize + } + + c.queued++ + return nil +} + +func (c *ConcurrencyLimiter) queueDec(decremented *bool) { + if decremented == nil || *decremented { + return } + *decremented = true + c.m.Lock() + defer c.m.Unlock() + + c.queued-- } // WithConcurrencyLimiters sets up middleware to limit the concurrency of diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go index c78cfa9b2..3fc545c76 100644 --- a/internal/middleware/limithandler/middleware_test.go +++ b/internal/middleware/limithandler/middleware_test.go @@ -201,7 +201,20 @@ func TestStreamLimitHandler(t *testing.T) { // id, but subsequent requests in a stream, even with the same // id, should bypass the concurrency limiter for i := 0; i < 10; i++ { - require.NoError(t, stream.Send(&grpc_testing.StreamingOutputCallRequest{})) + // Rate-limiting the stream is happening asynchronously when + // the server-side receives the first message. When the rate + // limiter then decides that the RPC call must be limited, + // it will close the stream. + // + // It may thus happen that we already see an EOF here in + // case the closed stream is received on the client-side + // before we have sent all requests. We thus need to special + // case this specific error code and will just stop sending + // requests in that case. + if err := stream.Send(&grpc_testing.StreamingOutputCallRequest{}); err != nil { + require.Equal(t, io.EOF, err) + break + } } require.NoError(t, stream.CloseSend()) |