diff options
author | John Cai <jcai@gitlab.com> | 2020-04-02 01:31:13 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-04-03 03:01:18 +0300 |
commit | 890eec7aaee35398ba25ea3be798ee962c39eae8 (patch) | |
tree | 0348fb6d504e76c1957a65afb6abb02c7ae38e72 | |
parent | 8a561867be7b08443babcf8223b9671312e47031 (diff) |
Drop extra idempotent requests in the limiterjc-idempotent-limiter
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 36 | ||||
-rw-r--r-- | internal/middleware/limithandler/limithandler.go | 9 |
2 files changed, 41 insertions, 4 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 9c437ed9f..735fe6f6a 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -7,6 +7,8 @@ import ( "time" "golang.org/x/sync/semaphore" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // LimitedFunc represents a function that will be limited @@ -26,6 +28,7 @@ type ConcurrencyLimiter struct { max int64 mux *sync.Mutex monitor ConcurrencyMonitor + idempotent bool } type semaphoreReference struct { @@ -93,11 +96,21 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite sem := c.getSemaphore(lockKey) defer c.putSemaphore(lockKey) - err := sem.Acquire(ctx, 1) - c.monitor.Dequeued(ctx) - if err != nil { - return nil, err + if c.idempotent { + acquired := sem.TryAcquire(1) + if !acquired { + return nil, status.Error(codes.AlreadyExists, "idempotent operation already in queue") + } + + c.monitor.Dequeued(ctx) + } else { + err := sem.Acquire(ctx, 1) + c.monitor.Dequeued(ctx) + if err != nil { + return nil, err + } } + defer sem.Release(1) c.monitor.Enter(ctx, time.Since(start)) @@ -120,6 +133,21 @@ func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter { } } +// NewIdempotentLimiter creates a new rate limiter +func NewIdempotentLimiter(monitor ConcurrencyMonitor) *ConcurrencyLimiter { + if monitor == nil { + monitor = &nullConcurrencyMonitor{} + } + + return &ConcurrencyLimiter{ + semaphores: make(map[string]*semaphoreReference), + max: 1, + mux: &sync.Mutex{}, + monitor: monitor, + idempotent: true, + } +} + type nullConcurrencyMonitor struct{} func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {} diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go index 7d8d6ab8f..00dc96450 100644 --- a/internal/middleware/limithandler/limithandler.go +++ b/internal/middleware/limithandler/limithandler.go @@ -108,6 +108,10 @@ func createLimiterConfig() map[string]*ConcurrencyLimiter { result[fullMethodName] = NewLimiter(max, NewPromMonitor("gitaly", fullMethodName)) } + for fullMethodName, _ := range idempotentRPCs { + result[fullMethodName] = NewIdempotentLimiter(NewPromMonitor("gitaly", fullMethodName)) + } + return result } @@ -115,3 +119,8 @@ func createLimiterConfig() map[string]*ConcurrencyLimiter { func SetMaxRepoConcurrency(config map[string]int) { maxConcurrencyPerRepoPerRPC = config } + +var idempotentRPCs = map[string]struct{}{ + "/gitaly.RepositoryService/ReplicateRepository": {}, + "/gitaly.RepositoryService/OptimizeRepository": {}, +} |