diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-01-05 12:20:40 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-01-06 16:51:59 +0300 |
commit | 81d969aa2701d173aedf1e19011450c5b3553c0d (patch) | |
tree | b7a2411cee2930338b6204859abb9dc6d1dbe12a | |
parent | b5171eee6027658c4c9315f638c4515fb745759b (diff) |
limithandler: Rename `semaphoreReference` structure
The `semaphoreReference` structure is a bit weirdly named as it is not
in fact a reference to a semaphore, but contains the semaphore logic
itself. Rename it to `keyedConcurrencyLimiter` to clarify that it is
intended to host the logic to limit concurrency per keyed resource.
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 33 |
1 files changed, 17 insertions, 16 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 5f6f1285e..e1e842c9f 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -26,13 +26,14 @@ var ErrMaxQueueSize = errors.New("maximum queue size reached") // QueueTickerCreator is a function that provides a ticker type QueueTickerCreator func() helper.Ticker -type semaphoreReference struct { +// keyedConcurrencyLimiter is a concurrency limiter that applies to a specific keyed resource. +type keyedConcurrencyLimiter struct { tokens chan struct{} count int newTicker QueueTickerCreator } -func (sem *semaphoreReference) acquire(ctx context.Context) error { +func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error { var ticker helper.Ticker if sem.newTicker != nil { @@ -54,11 +55,11 @@ func (sem *semaphoreReference) acquire(ctx context.Context) error { } } -func (sem *semaphoreReference) release() { <-sem.tokens } +func (sem *keyedConcurrencyLimiter) release() { <-sem.tokens } // ConcurrencyLimiter contains rate limiter state type ConcurrencyLimiter struct { - semaphores map[string]*semaphoreReference + limitsByKey map[string]*keyedConcurrencyLimiter // maxPerKey is the maximum number of concurrent operations // per lockKey maxPerKey int64 @@ -79,7 +80,7 @@ func NewConcurrencyLimiter(perKeyLimit, globalLimit int, maxWaitTickerGetter Que } return &ConcurrencyLimiter{ - semaphores: make(map[string]*semaphoreReference), + limitsByKey: make(map[string]*keyedConcurrencyLimiter), maxPerKey: int64(perKeyLimit), queuedLimit: int64(globalLimit), monitor: monitor, @@ -114,8 +115,8 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite start := time.Now() c.monitor.Queued(ctx) - sem := c.getSemaphore(lockKey) - defer c.putSemaphore(lockKey) + sem := c.getConcurrencyLimit(lockKey) + defer c.putConcurrencyLimit(lockKey) err := sem.acquire(ctx) c.queueDec(&decremented) @@ -143,26 +144,26 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite } // Lazy create a semaphore for the given key -func (c *ConcurrencyLimiter) getSemaphore(lockKey string) *semaphoreReference { +func (c *ConcurrencyLimiter) getConcurrencyLimit(lockKey string) *keyedConcurrencyLimiter { c.mux.Lock() defer c.mux.Unlock() - if c.semaphores[lockKey] == nil { - c.semaphores[lockKey] = &semaphoreReference{ + if c.limitsByKey[lockKey] == nil { + c.limitsByKey[lockKey] = &keyedConcurrencyLimiter{ tokens: make(chan struct{}, c.maxPerKey), newTicker: c.maxWaitTickerGetter, } } - c.semaphores[lockKey].count++ - return c.semaphores[lockKey] + c.limitsByKey[lockKey].count++ + return c.limitsByKey[lockKey] } -func (c *ConcurrencyLimiter) putSemaphore(lockKey string) { +func (c *ConcurrencyLimiter) putConcurrencyLimit(lockKey string) { c.mux.Lock() defer c.mux.Unlock() - ref := c.semaphores[lockKey] + ref := c.limitsByKey[lockKey] if ref == nil { panic("semaphore should be in the map") } @@ -173,7 +174,7 @@ func (c *ConcurrencyLimiter) putSemaphore(lockKey string) { ref.count-- if ref.count == 0 { - delete(c.semaphores, lockKey) + delete(c.limitsByKey, lockKey) } } @@ -181,7 +182,7 @@ func (c *ConcurrencyLimiter) countSemaphores() int { c.mux.RLock() defer c.mux.RUnlock() - return len(c.semaphores) + return len(c.limitsByKey) } func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error { |