Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-09-12 11:50:28 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-09-13 17:46:10 +0300
commit320011552234b53b18d229b07bf67504aadd52cb (patch)
treec142bebdf5ccab3de47b283efd8cb851d0d7df77 /internal/limiter
parent53846028eb49e0cb32fb8ca43cba3908444bf22f (diff)
limiter: Generalize concurrency limiter's buffers used for semaphore
The concurrency limiter controls the concurrency and queueing mechanism using two buffered channels. Naturally, we can consider them as two weighted semaphores. This commit extracts the core operations of those two semaphores to a staticSemaphore struct. This extraction also standardizes the interface so that the forward commits can swap in another semaphore implementation.
Diffstat (limited to 'internal/limiter')
-rw-r--r--internal/limiter/concurrency_limiter.go134
1 files changed, 89 insertions, 45 deletions
diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go
index e0d09cf39..656b7b4c5 100644
--- a/internal/limiter/concurrency_limiter.go
+++ b/internal/limiter/concurrency_limiter.go
@@ -14,6 +14,60 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)
+// semaphorer is an interface of a specific-purpose semaphore used for concurrency control.
+type semaphorer interface {
+ Acquire(ctx context.Context) error
+ TryAcquire() error
+ Release()
+ Count() int
+}
+
+// staticSemaphore implements semaphorer interface. It is a wrapper for a buffer channel. When a caller wants to acquire
+// the semaphore, a token is pushed to the channel. In contrast, when it wants to release the semaphore, it pulls one
+// token from the channel.
+type staticSemaphore struct {
+ queue chan struct{}
+}
+
+// newStaticSemaphore initializes and returns a staticSemaphore instance.
+func newStaticSemaphore(size int) *staticSemaphore {
+ return &staticSemaphore{queue: make(chan struct{}, size)}
+}
+
+// Acquire acquires the semaphore. The caller is blocked until there is a available resource or the context is cancelled
+// or the ticker ticks.
+func (s *staticSemaphore) Acquire(ctx context.Context) error {
+ select {
+ case s.queue <- struct{}{}:
+ return nil
+ case <-ctx.Done():
+ if errors.Is(ctx.Err(), context.DeadlineExceeded) {
+ return ErrMaxQueueTime
+ }
+ return ctx.Err()
+ }
+}
+
+// TryAcquire tries to acquire the semaphore. If it fails to do so, it will immediately return ErrMaxQueueSize and no change is made to the semaphore.
+func (s *staticSemaphore) TryAcquire() error {
+ select {
+ case s.queue <- struct{}{}:
+ return nil
+ default:
+ return ErrMaxQueueSize
+ }
+}
+
+// Release releases the semaphore by pushing a token back to the underlying channel.
+func (s *staticSemaphore) Release() {
+ <-s.queue
+}
+
+// Count returns the amount of current concurrent access to the semaphore.
+func (s *staticSemaphore) Count() int {
+ return len(s.queue)
+}
+
const (
// TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All
// requests of the same method shares the concurrency limit.
@@ -40,12 +94,12 @@ type keyedConcurrencyLimiter struct {
maxQueueWait time.Duration
setWaitTimeoutContext func() context.Context
- // concurrencyTokens is the channel of available concurrency tokens, where every token
+ // concurrencyTokens is the counting semaphore to control available concurrency tokens, where every token
// allows one concurrent call to the concurrency-limited function.
- concurrencyTokens chan struct{}
- // queueTokens is the channel of available queue tokens, where every token allows one
+ concurrencyTokens semaphorer
+ // queueTokens is the counting semaphore to control available queue tokens, where every token allows one
// concurrent call to be admitted to the queue.
- queueTokens chan struct{}
+ queueTokens semaphorer
}
// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max
@@ -56,34 +110,32 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// callers may wait for the concurrency token at the same time. If there are no more
// queueing tokens then this indicates that the queue is full and we thus return an
// error immediately.
- select {
- case sem.queueTokens <- struct{}{}:
- // We have acquired a queueing token, so we need to release it if acquiring
- // the concurrency token fails. If we succeed to acquire the concurrency
- // token though then we retain the queueing token until the caller signals
- // that the concurrency-limited function has finished. As a consequence the
- // queue token is returned together with the concurrency token.
- //
- // A simpler model would be to just have `maxQueueLength` many queueing
- // tokens. But this would add concurrency-limiting when acquiring the queue
- // token itself, which is not what we want to do. Instead, we want to admit
- // as many callers into the queue as the queue length permits plus the
- // number of available concurrency tokens allows.
- defer func() {
- if returnedErr != nil {
- <-sem.queueTokens
- }
- }()
- default:
- return ErrMaxQueueSize
+ if err := sem.queueTokens.TryAcquire(); err != nil {
+ return err
}
+ // We have acquired a queueing token, so we need to release it if acquiring
+ // the concurrency token fails. If we succeed to acquire the concurrency
+ // token though then we retain the queueing token until the caller signals
+ // that the concurrency-limited function has finished. As a consequence the
+ // queue token is returned together with the concurrency token.
+ //
+ // A simpler model would be to just have `maxQueueLength` many queueing
+ // tokens. But this would add concurrency-limiting when acquiring the queue
+ // token itself, which is not what we want to do. Instead, we want to admit
+ // as many callers into the queue as the queue length permits plus the
+ // number of available concurrency tokens allows.
+ defer func() {
+ if returnedErr != nil {
+ sem.queueTokens.Release()
+ }
+ }()
}
// We are queued now, so let's tell the monitor. Furthermore, even though we're still
// holding the queueing token when this function exits successfully we also tell the monitor
// that we have exited the queue. It is only an implementation detail anyway that we hold on
// to the token, so the monitor shouldn't care about that.
- sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens))
+ sem.monitor.Queued(ctx, limitingKey, sem.queueLength())
defer sem.monitor.Dequeued(ctx)
if sem.maxQueueWait != 0 {
@@ -97,23 +149,15 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
}
// Try to acquire the concurrency token now that we're in the queue.
- select {
- case sem.concurrencyTokens <- struct{}{}:
- return nil
- case <-ctx.Done():
- if errors.Is(ctx.Err(), context.DeadlineExceeded) {
- return ErrMaxQueueTime
- }
- return ctx.Err()
- }
+ return sem.concurrencyTokens.Acquire(ctx)
}
// release releases the acquired tokens.
func (sem *keyedConcurrencyLimiter) release() {
if sem.queueTokens != nil {
- <-sem.queueTokens
+ sem.queueTokens.Release()
}
- <-sem.concurrencyTokens
+ sem.concurrencyTokens.Release()
}
// queueLength returns the length of the queue waiting for tokens.
@@ -121,23 +165,23 @@ func (sem *keyedConcurrencyLimiter) queueLength() int {
if sem.queueTokens == nil {
return 0
}
- return len(sem.queueTokens) - len(sem.concurrencyTokens)
+ return sem.queueTokens.Count() - sem.concurrencyTokens.Count()
}
// inProgress returns the number of in-progress tokens.
func (sem *keyedConcurrencyLimiter) inProgress() int {
- return len(sem.concurrencyTokens)
+ return sem.concurrencyTokens.Count()
}
// ConcurrencyLimiter contains rate limiter state.
type ConcurrencyLimiter struct {
// maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
// This limit is per key.
- maxConcurrencyLimit int64
+ maxConcurrencyLimit int
// maxQueueLength is the maximum number of operations allowed to wait in a queued state.
// This limit is global and applies before the concurrency limit. Subsequent incoming
// operations will be rejected with an error immediately.
- maxQueueLength int64
+ maxQueueLength int
// maxQueueWait is a time duration of an operation allowed to wait in the queue.
maxQueueWait time.Duration
// SetWaitTimeoutContext is a function for setting up timeout context. If this is nill, context.WithTimeout is
@@ -162,8 +206,8 @@ func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueueWait
}
return &ConcurrencyLimiter{
- maxConcurrencyLimit: int64(maxConcurrencyLimit),
- maxQueueLength: int64(maxQueueLength),
+ maxConcurrencyLimit: maxConcurrencyLimit,
+ maxQueueLength: maxQueueLength,
maxQueueWait: maxQueueWait,
monitor: monitor,
limitsByKey: make(map[string]*keyedConcurrencyLimiter),
@@ -233,16 +277,16 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu
// Set up the queue tokens in case a maximum queue length was requested. As the
// queue tokens are kept during the whole lifetime of the concurrency-limited
// function we add the concurrency tokens to the number of available token.
- var queueTokens chan struct{}
+ var queueTokens semaphorer
if c.maxQueueLength > 0 {
- queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength)
+ queueTokens = newStaticSemaphore(c.maxConcurrencyLimit + c.maxQueueLength)
}
c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{
monitor: c.monitor,
maxQueueWait: c.maxQueueWait,
setWaitTimeoutContext: c.SetWaitTimeoutContext,
- concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit),
+ concurrencyTokens: newStaticSemaphore(c.maxConcurrencyLimit),
queueTokens: queueTokens,
}
}