Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-01-20 00:46:50 +0300
committerJohn Cai <jcai@gitlab.com>2022-02-01 07:22:29 +0300
commit023fee7de6eb7ea428fb3e089bffabe51b8c05e7 (patch)
tree72511e135ffe696f3723a2b714ae70cb84710e80
parent2b9720a51e6f447af10327b6dd275e91f2029bc7 (diff)
limithandler: enable max queue wait time
Concurrency queues can grow boundlessly. This can lead to production incidents. Add a timeout for how long a request can remain in a concurrency queue for. Changelog: added
-rw-r--r--internal/gitaly/config/config.go3
-rw-r--r--internal/metadata/featureflag/ff_max_queue_wait_time.go5
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go50
-rw-r--r--internal/middleware/limithandler/concurrency_limiter_test.go138
-rw-r--r--internal/middleware/limithandler/middleware.go28
-rw-r--r--internal/testhelper/testhelper.go3
6 files changed, 207 insertions, 20 deletions
diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go
index 50be0489a..9bd46f2df 100644
--- a/internal/gitaly/config/config.go
+++ b/internal/gitaly/config/config.go
@@ -143,6 +143,9 @@ type Concurrency struct {
// MaxQueueSize is the maximum number of requests in the queue waiting to be picked up
// after which subsequent requests will return with an error.
MaxQueueSize int `toml:"max_queue_size"`
+ // MaxQueueWait is the maximum time a request can remain in the concurrency queue
+ // waiting to be picked up by Gitaly
+ MaxQueueWait Duration `toml:"max_queue_wait"`
}
// StreamCacheConfig contains settings for a streamcache instance.
diff --git a/internal/metadata/featureflag/ff_max_queue_wait_time.go b/internal/metadata/featureflag/ff_max_queue_wait_time.go
new file mode 100644
index 000000000..2cd281a5b
--- /dev/null
+++ b/internal/metadata/featureflag/ff_max_queue_wait_time.go
@@ -0,0 +1,5 @@
+package featureflag
+
+// ConcurrencyQueueMaxWait will enable the concurrency limiter to drop requests that are waiting in
+// the concurrency queue for longer than the configured time.
+var ConcurrencyQueueMaxWait = NewFeatureFlag("concurrency_queue_max_wait", false)
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index e7ccf1e59..92e5f965d 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -7,12 +7,20 @@ import (
"sync"
"time"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
)
+// ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the
+// concurrency queue.
+var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")
+
// LimitedFunc represents a function that will be limited
type LimitedFunc func() (resp interface{}, err error)
+// QueueTickerCreator is a function that provides a ticker
+type QueueTickerCreator func() helper.Ticker
+
// ConcurrencyLimiter contains rate limiter state
type ConcurrencyLimiter struct {
semaphores map[string]*semaphoreReference
@@ -23,20 +31,36 @@ type ConcurrencyLimiter struct {
queued int64
// queuedLimit is the maximum number of operations allowed to wait in a queued state.
// subsequent incoming operations will fail with an error.
- queuedLimit int64
- monitor ConcurrencyMonitor
- mux sync.RWMutex
+ queuedLimit int64
+ monitor ConcurrencyMonitor
+ mux sync.RWMutex
+ maxWaitTickerGetter QueueTickerCreator
}
type semaphoreReference struct {
- tokens chan struct{}
- count int
+ tokens chan struct{}
+ count int
+ newTicker QueueTickerCreator
}
func (sem *semaphoreReference) acquire(ctx context.Context) error {
+ var ticker helper.Ticker
+
+ if featureflag.ConcurrencyQueueMaxWait.IsEnabled(ctx) &&
+ sem.newTicker != nil {
+ ticker = sem.newTicker()
+ } else {
+ ticker = helper.Ticker(helper.NewManualTicker())
+ }
+
+ defer ticker.Stop()
+ ticker.Reset()
+
select {
case sem.tokens <- struct{}{}:
return nil
+ case <-ticker.C():
+ return ErrMaxQueueTime
case <-ctx.Done():
return ctx.Err()
}
@@ -50,7 +74,10 @@ func (c *ConcurrencyLimiter) getSemaphore(lockKey string) *semaphoreReference {
defer c.mux.Unlock()
if c.semaphores[lockKey] == nil {
- c.semaphores[lockKey] = &semaphoreReference{tokens: make(chan struct{}, c.maxPerKey)}
+ c.semaphores[lockKey] = &semaphoreReference{
+ tokens: make(chan struct{}, c.maxPerKey),
+ newTicker: c.maxWaitTickerGetter,
+ }
}
c.semaphores[lockKey].count++
@@ -143,15 +170,16 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
}
// NewLimiter creates a new rate limiter
-func NewLimiter(perKeyLimit, globalLimit int, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+func NewLimiter(perKeyLimit, globalLimit int, maxWaitTickerGetter QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
if monitor == nil {
monitor = &nullConcurrencyMonitor{}
}
return &ConcurrencyLimiter{
- semaphores: make(map[string]*semaphoreReference),
- maxPerKey: int64(perKeyLimit),
- queuedLimit: int64(globalLimit),
- monitor: monitor,
+ semaphores: make(map[string]*semaphoreReference),
+ maxPerKey: int64(perKeyLimit),
+ queuedLimit: int64(globalLimit),
+ monitor: monitor,
+ maxWaitTickerGetter: maxWaitTickerGetter,
}
}
diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go
index 989f9478a..e581b17e2 100644
--- a/internal/middleware/limithandler/concurrency_limiter_test.go
+++ b/internal/middleware/limithandler/concurrency_limiter_test.go
@@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
@@ -138,7 +139,12 @@ func TestLimiter(t *testing.T) {
gauge := &counter{}
- limiter := NewLimiter(tt.maxConcurrency, 0, gauge)
+ limiter := NewLimiter(
+ tt.maxConcurrency,
+ 0,
+ nil,
+ gauge,
+ )
wg := sync.WaitGroup{}
wg.Add(tt.concurrency)
@@ -208,16 +214,16 @@ func TestLimiter(t *testing.T) {
}
}
-type blockingCounter struct {
+type blockingQueueCounter struct {
counter
- ch chan struct{}
+ queuedCh chan struct{}
}
// Queued will block on a channel. We need a way to synchronize on when a Limiter has attempted to acquire
// a semaphore but has not yet. The caller can use the channel to wait for many requests to be queued
-func (b *blockingCounter) Queued(_ context.Context) {
- b.ch <- struct{}{}
+func (b *blockingQueueCounter) Queued(_ context.Context) {
+ b.queuedCh <- struct{}{}
}
func TestConcurrencyLimiter_queueLimit(t *testing.T) {
@@ -247,9 +253,9 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
)
monitorCh := make(chan struct{})
- gauge := &blockingCounter{ch: monitorCh}
+ gauge := &blockingQueueCounter{queuedCh: monitorCh}
ch := make(chan struct{})
- limiter := NewLimiter(1, queueLimit, gauge)
+ limiter := NewLimiter(1, queueLimit, nil, gauge)
// occupied with one live request that takes a long time to complete
go func() {
@@ -308,3 +314,121 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
})
}
}
+
+type blockingDequeueCounter struct {
+ counter
+
+ dequeuedCh chan struct{}
+}
+
+// Dequeued will block on a channel. We need a way to synchronize on when a Limiter has successfully
+// acquired a semaphore but has not yet. The caller can use the channel to wait for many requests to
+// be queued
+func (b *blockingDequeueCounter) Dequeued(context.Context) {
+ b.dequeuedCh <- struct{}{}
+}
+
+func TestLimitConcurrency_queueWaitTime(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ t.Run("with feature flag", func(t *testing.T) {
+ ctx = featureflag.IncomingCtxWithFeatureFlag(
+ ctx,
+ featureflag.ConcurrencyQueueMaxWait,
+ true,
+ )
+
+ ticker := helper.NewManualTicker()
+
+ dequeuedCh := make(chan struct{})
+ gauge := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
+ limiter := NewLimiter(
+ 1,
+ 0,
+ func() helper.Ticker {
+ return ticker
+ },
+ gauge,
+ )
+
+ ch := make(chan struct{})
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ _, err := limiter.Limit(ctx, "key", func() (interface{}, error) {
+ <-ch
+ return nil, nil
+ })
+ require.NoError(t, err)
+ wg.Done()
+ }()
+
+ <-dequeuedCh
+
+ ticker.Tick()
+
+ errChan := make(chan error)
+ go func() {
+ _, err := limiter.Limit(ctx, "key", func() (interface{}, error) {
+ return nil, nil
+ })
+ errChan <- err
+ }()
+
+ <-dequeuedCh
+ err := <-errChan
+
+ assert.Equal(t, ErrMaxQueueTime, err)
+
+ close(ch)
+ wg.Wait()
+ })
+
+ t.Run("without feature flag", func(t *testing.T) {
+ ctx = featureflag.IncomingCtxWithFeatureFlag(
+ ctx,
+ featureflag.ConcurrencyQueueMaxWait,
+ false,
+ )
+
+ ticker := helper.NewManualTicker()
+
+ dequeuedCh := make(chan struct{})
+ gauge := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
+ limiter := NewLimiter(
+ 1,
+ 0,
+ func() helper.Ticker {
+ return ticker
+ },
+ gauge,
+ )
+
+ ch := make(chan struct{})
+ go func() {
+ _, err := limiter.Limit(ctx, "key", func() (interface{}, error) {
+ <-ch
+ return nil, nil
+ })
+ require.NoError(t, err)
+ }()
+
+ <-dequeuedCh
+
+ ticker.Tick()
+
+ errChan := make(chan error)
+ go func() {
+ _, err := limiter.Limit(ctx, "key", func() (interface{}, error) {
+ return nil, nil
+ })
+ errChan <- err
+ }()
+
+ close(ch)
+ <-dequeuedCh
+ err := <-errChan
+
+ assert.NoError(t, err)
+ })
+}
diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go
index 48ec48ca4..347ceccf8 100644
--- a/internal/middleware/limithandler/middleware.go
+++ b/internal/middleware/limithandler/middleware.go
@@ -7,6 +7,7 @@ import (
grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"google.golang.org/grpc"
)
@@ -119,14 +120,37 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor {
func createLimiterConfig(middleware *LimiterMiddleware, cfg config.Cfg) map[string]*ConcurrencyLimiter {
result := make(map[string]*ConcurrencyLimiter)
+
+ newTickerFunc := func() helper.Ticker {
+ return helper.NewManualTicker()
+ }
+
for _, limit := range cfg.Concurrency {
- result[limit.RPC] = NewLimiter(limit.MaxPerRepo, limit.MaxQueueSize, newPromMonitor(middleware, "gitaly", limit.RPC))
+ if limit.MaxQueueWait > 0 {
+ limit := limit
+ newTickerFunc = func() helper.Ticker {
+ return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
+ }
+ }
+
+ result[limit.RPC] = NewLimiter(
+ limit.MaxPerRepo,
+ limit.MaxQueueSize,
+ newTickerFunc,
+ newPromMonitor(middleware, "gitaly", limit.RPC),
+ )
}
// Set default for ReplicateRepository.
replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
if _, ok := result[replicateRepositoryFullMethod]; !ok {
- result[replicateRepositoryFullMethod] = NewLimiter(1, 0, newPromMonitor(middleware, "gitaly", replicateRepositoryFullMethod))
+ result[replicateRepositoryFullMethod] = NewLimiter(
+ 1,
+ 0,
+ func() helper.Ticker {
+ return helper.NewManualTicker()
+ },
+ newPromMonitor(middleware, "gitaly", replicateRepositoryFullMethod))
}
return result
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index a9ebfe62a..5375a8082 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -191,6 +191,9 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context {
// ConcurrencyQueueEnforceMax is in the codepath of every RPC call since its in the limithandler
// middleware.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.ConcurrencyQueueEnforceMax, true)
+ // ConcurrencyQueueMaxWait is in the codepath of every RPC call since it's in the limithandler
+ // middleware.
+ ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.ConcurrencyQueueMaxWait, true)
// We use hook directories everywhere, so it's infeasible to test this on a global
// scale. Instead, we use it randomly.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.HooksInTempdir, mrand.Int()%2 == 0)