Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/middleware/limithandler/concurrency_limiter.go')
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go36
1 files changed, 32 insertions, 4 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index 9c437ed9f..735fe6f6a 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -7,6 +7,8 @@ import (
"time"
"golang.org/x/sync/semaphore"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
// LimitedFunc represents a function that will be limited
@@ -26,6 +28,7 @@ type ConcurrencyLimiter struct {
max int64
mux *sync.Mutex
monitor ConcurrencyMonitor
+ idempotent bool
}
type semaphoreReference struct {
@@ -93,11 +96,21 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
sem := c.getSemaphore(lockKey)
defer c.putSemaphore(lockKey)
- err := sem.Acquire(ctx, 1)
- c.monitor.Dequeued(ctx)
- if err != nil {
- return nil, err
+ if c.idempotent {
+ acquired := sem.TryAcquire(1)
+ if !acquired {
+ return nil, status.Error(codes.AlreadyExists, "idempotent operation already in queue")
+ }
+
+ c.monitor.Dequeued(ctx)
+ } else {
+ err := sem.Acquire(ctx, 1)
+ c.monitor.Dequeued(ctx)
+ if err != nil {
+ return nil, err
+ }
}
+
defer sem.Release(1)
c.monitor.Enter(ctx, time.Since(start))
@@ -120,6 +133,21 @@ func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
}
}
+// NewIdempotentLimiter creates a new rate limiter
+func NewIdempotentLimiter(monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+ if monitor == nil {
+ monitor = &nullConcurrencyMonitor{}
+ }
+
+ return &ConcurrencyLimiter{
+ semaphores: make(map[string]*semaphoreReference),
+ max: 1,
+ mux: &sync.Mutex{},
+ monitor: monitor,
+ idempotent: true,
+ }
+}
+
type nullConcurrencyMonitor struct{}
func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {}