Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-04-02 01:31:13 +0300
committerJohn Cai <jcai@gitlab.com>2020-04-03 03:01:18 +0300
commit890eec7aaee35398ba25ea3be798ee962c39eae8 (patch)
tree0348fb6d504e76c1957a65afb6abb02c7ae38e72
parent8a561867be7b08443babcf8223b9671312e47031 (diff)
Drop extra idempotent requests in the limiterjc-idempotent-limiter
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go36
-rw-r--r--internal/middleware/limithandler/limithandler.go9
2 files changed, 41 insertions, 4 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index 9c437ed9f..735fe6f6a 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -7,6 +7,8 @@ import (
"time"
"golang.org/x/sync/semaphore"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
// LimitedFunc represents a function that will be limited
@@ -26,6 +28,7 @@ type ConcurrencyLimiter struct {
max int64
mux *sync.Mutex
monitor ConcurrencyMonitor
+ idempotent bool
}
type semaphoreReference struct {
@@ -93,11 +96,21 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
sem := c.getSemaphore(lockKey)
defer c.putSemaphore(lockKey)
- err := sem.Acquire(ctx, 1)
- c.monitor.Dequeued(ctx)
- if err != nil {
- return nil, err
+ if c.idempotent {
+ acquired := sem.TryAcquire(1)
+ if !acquired {
+ return nil, status.Error(codes.AlreadyExists, "idempotent operation already in queue")
+ }
+
+ c.monitor.Dequeued(ctx)
+ } else {
+ err := sem.Acquire(ctx, 1)
+ c.monitor.Dequeued(ctx)
+ if err != nil {
+ return nil, err
+ }
}
+
defer sem.Release(1)
c.monitor.Enter(ctx, time.Since(start))
@@ -120,6 +133,21 @@ func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
}
}
+// NewIdempotentLimiter creates a new rate limiter
+func NewIdempotentLimiter(monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+ if monitor == nil {
+ monitor = &nullConcurrencyMonitor{}
+ }
+
+ return &ConcurrencyLimiter{
+ semaphores: make(map[string]*semaphoreReference),
+ max: 1,
+ mux: &sync.Mutex{},
+ monitor: monitor,
+ idempotent: true,
+ }
+}
+
type nullConcurrencyMonitor struct{}
func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {}
diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go
index 7d8d6ab8f..00dc96450 100644
--- a/internal/middleware/limithandler/limithandler.go
+++ b/internal/middleware/limithandler/limithandler.go
@@ -108,6 +108,10 @@ func createLimiterConfig() map[string]*ConcurrencyLimiter {
result[fullMethodName] = NewLimiter(max, NewPromMonitor("gitaly", fullMethodName))
}
+ for fullMethodName, _ := range idempotentRPCs {
+ result[fullMethodName] = NewIdempotentLimiter(NewPromMonitor("gitaly", fullMethodName))
+ }
+
return result
}
@@ -115,3 +119,8 @@ func createLimiterConfig() map[string]*ConcurrencyLimiter {
func SetMaxRepoConcurrency(config map[string]int) {
maxConcurrencyPerRepoPerRPC = config
}
+
+var idempotentRPCs = map[string]struct{}{
+ "/gitaly.RepositoryService/ReplicateRepository": {},
+ "/gitaly.RepositoryService/OptimizeRepository": {},
+}