diff options
Diffstat (limited to 'internal/middleware/limithandler/concurrency_limiter.go')
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 36 |
1 files changed, 32 insertions, 4 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 9c437ed9f..735fe6f6a 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -7,6 +7,8 @@ import ( "time" "golang.org/x/sync/semaphore" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // LimitedFunc represents a function that will be limited @@ -26,6 +28,7 @@ type ConcurrencyLimiter struct { max int64 mux *sync.Mutex monitor ConcurrencyMonitor + idempotent bool } type semaphoreReference struct { @@ -93,11 +96,21 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite sem := c.getSemaphore(lockKey) defer c.putSemaphore(lockKey) - err := sem.Acquire(ctx, 1) - c.monitor.Dequeued(ctx) - if err != nil { - return nil, err + if c.idempotent { + acquired := sem.TryAcquire(1) + if !acquired { + return nil, status.Error(codes.AlreadyExists, "idempotent operation already in queue") + } + + c.monitor.Dequeued(ctx) + } else { + err := sem.Acquire(ctx, 1) + c.monitor.Dequeued(ctx) + if err != nil { + return nil, err + } } + defer sem.Release(1) c.monitor.Enter(ctx, time.Since(start)) @@ -120,6 +133,21 @@ func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter { } } +// NewIdempotentLimiter creates a new rate limiter +func NewIdempotentLimiter(monitor ConcurrencyMonitor) *ConcurrencyLimiter { + if monitor == nil { + monitor = &nullConcurrencyMonitor{} + } + + return &ConcurrencyLimiter{ + semaphores: make(map[string]*semaphoreReference), + max: 1, + mux: &sync.Mutex{}, + monitor: monitor, + idempotent: true, + } +} + type nullConcurrencyMonitor struct{} func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {} |