Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-01-05 12:20:40 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-01-06 16:51:59 +0300
commit81d969aa2701d173aedf1e19011450c5b3553c0d (patch)
treeb7a2411cee2930338b6204859abb9dc6d1dbe12a
parentb5171eee6027658c4c9315f638c4515fb745759b (diff)
limithandler: Rename `semaphoreReference` structure
The `semaphoreReference` structure is a bit weirdly named as it is not in fact a reference to a semaphore, but contains the semaphore logic itself. Rename it to `keyedConcurrencyLimiter` to clarify that it is intended to host the logic to limit concurrency per keyed resource.
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go33
1 files changed, 17 insertions, 16 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index 5f6f1285e..e1e842c9f 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -26,13 +26,14 @@ var ErrMaxQueueSize = errors.New("maximum queue size reached")
// QueueTickerCreator is a function that provides a ticker
type QueueTickerCreator func() helper.Ticker
-type semaphoreReference struct {
+// keyedConcurrencyLimiter is a concurrency limiter that applies to a specific keyed resource.
+type keyedConcurrencyLimiter struct {
tokens chan struct{}
count int
newTicker QueueTickerCreator
}
-func (sem *semaphoreReference) acquire(ctx context.Context) error {
+func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error {
var ticker helper.Ticker
if sem.newTicker != nil {
@@ -54,11 +55,11 @@ func (sem *semaphoreReference) acquire(ctx context.Context) error {
}
}
-func (sem *semaphoreReference) release() { <-sem.tokens }
+func (sem *keyedConcurrencyLimiter) release() { <-sem.tokens }
// ConcurrencyLimiter contains rate limiter state
type ConcurrencyLimiter struct {
- semaphores map[string]*semaphoreReference
+ limitsByKey map[string]*keyedConcurrencyLimiter
// maxPerKey is the maximum number of concurrent operations
// per lockKey
maxPerKey int64
@@ -79,7 +80,7 @@ func NewConcurrencyLimiter(perKeyLimit, globalLimit int, maxWaitTickerGetter Que
}
return &ConcurrencyLimiter{
- semaphores: make(map[string]*semaphoreReference),
+ limitsByKey: make(map[string]*keyedConcurrencyLimiter),
maxPerKey: int64(perKeyLimit),
queuedLimit: int64(globalLimit),
monitor: monitor,
@@ -114,8 +115,8 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
start := time.Now()
c.monitor.Queued(ctx)
- sem := c.getSemaphore(lockKey)
- defer c.putSemaphore(lockKey)
+ sem := c.getConcurrencyLimit(lockKey)
+ defer c.putConcurrencyLimit(lockKey)
err := sem.acquire(ctx)
c.queueDec(&decremented)
@@ -143,26 +144,26 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
}
// Lazy create a semaphore for the given key
-func (c *ConcurrencyLimiter) getSemaphore(lockKey string) *semaphoreReference {
+func (c *ConcurrencyLimiter) getConcurrencyLimit(lockKey string) *keyedConcurrencyLimiter {
c.mux.Lock()
defer c.mux.Unlock()
- if c.semaphores[lockKey] == nil {
- c.semaphores[lockKey] = &semaphoreReference{
+ if c.limitsByKey[lockKey] == nil {
+ c.limitsByKey[lockKey] = &keyedConcurrencyLimiter{
tokens: make(chan struct{}, c.maxPerKey),
newTicker: c.maxWaitTickerGetter,
}
}
- c.semaphores[lockKey].count++
- return c.semaphores[lockKey]
+ c.limitsByKey[lockKey].count++
+ return c.limitsByKey[lockKey]
}
-func (c *ConcurrencyLimiter) putSemaphore(lockKey string) {
+func (c *ConcurrencyLimiter) putConcurrencyLimit(lockKey string) {
c.mux.Lock()
defer c.mux.Unlock()
- ref := c.semaphores[lockKey]
+ ref := c.limitsByKey[lockKey]
if ref == nil {
panic("semaphore should be in the map")
}
@@ -173,7 +174,7 @@ func (c *ConcurrencyLimiter) putSemaphore(lockKey string) {
ref.count--
if ref.count == 0 {
- delete(c.semaphores, lockKey)
+ delete(c.limitsByKey, lockKey)
}
}
@@ -181,7 +182,7 @@ func (c *ConcurrencyLimiter) countSemaphores() int {
c.mux.RLock()
defer c.mux.RUnlock()
- return len(c.semaphores)
+ return len(c.limitsByKey)
}
func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error {