diff options
author | John Cai <jcai@gitlab.com> | 2022-01-20 00:46:50 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-02-01 07:22:29 +0300 |
commit | 023fee7de6eb7ea428fb3e089bffabe51b8c05e7 (patch) | |
tree | 72511e135ffe696f3723a2b714ae70cb84710e80 | |
parent | 2b9720a51e6f447af10327b6dd275e91f2029bc7 (diff) |
limithandler: enable max queue wait time
Concurrency queues can grow boundlessly. This can lead to production
incidents. Add a timeout for how long a request can remain in a
concurrency queue for.
Changelog: added
-rw-r--r-- | internal/gitaly/config/config.go | 3 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_max_queue_wait_time.go | 5 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 50 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter_test.go | 138 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware.go | 28 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 3 |
6 files changed, 207 insertions, 20 deletions
diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 50be0489a..9bd46f2df 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -143,6 +143,9 @@ type Concurrency struct { // MaxQueueSize is the maximum number of requests in the queue waiting to be picked up // after which subsequent requests will return with an error. MaxQueueSize int `toml:"max_queue_size"` + // MaxQueueWait is the maximum time a request can remain in the concurrency queue + // waiting to be picked up by Gitaly + MaxQueueWait Duration `toml:"max_queue_wait"` } // StreamCacheConfig contains settings for a streamcache instance. diff --git a/internal/metadata/featureflag/ff_max_queue_wait_time.go b/internal/metadata/featureflag/ff_max_queue_wait_time.go new file mode 100644 index 000000000..2cd281a5b --- /dev/null +++ b/internal/metadata/featureflag/ff_max_queue_wait_time.go @@ -0,0 +1,5 @@ +package featureflag + +// ConcurrencyQueueMaxWait will enable the concurrency limiter to drop requests that are waiting in +// the concurrency queue for longer than the configured time. +var ConcurrencyQueueMaxWait = NewFeatureFlag("concurrency_queue_max_wait", false) diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index e7ccf1e59..92e5f965d 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -7,12 +7,20 @@ import ( "sync" "time" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" ) +// ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the +// concurrency queue. +var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached") + // LimitedFunc represents a function that will be limited type LimitedFunc func() (resp interface{}, err error) +// QueueTickerCreator is a function that provides a ticker +type QueueTickerCreator func() helper.Ticker + // ConcurrencyLimiter contains rate limiter state type ConcurrencyLimiter struct { semaphores map[string]*semaphoreReference @@ -23,20 +31,36 @@ type ConcurrencyLimiter struct { queued int64 // queuedLimit is the maximum number of operations allowed to wait in a queued state. // subsequent incoming operations will fail with an error. - queuedLimit int64 - monitor ConcurrencyMonitor - mux sync.RWMutex + queuedLimit int64 + monitor ConcurrencyMonitor + mux sync.RWMutex + maxWaitTickerGetter QueueTickerCreator } type semaphoreReference struct { - tokens chan struct{} - count int + tokens chan struct{} + count int + newTicker QueueTickerCreator } func (sem *semaphoreReference) acquire(ctx context.Context) error { + var ticker helper.Ticker + + if featureflag.ConcurrencyQueueMaxWait.IsEnabled(ctx) && + sem.newTicker != nil { + ticker = sem.newTicker() + } else { + ticker = helper.Ticker(helper.NewManualTicker()) + } + + defer ticker.Stop() + ticker.Reset() + select { case sem.tokens <- struct{}{}: return nil + case <-ticker.C(): + return ErrMaxQueueTime case <-ctx.Done(): return ctx.Err() } @@ -50,7 +74,10 @@ func (c *ConcurrencyLimiter) getSemaphore(lockKey string) *semaphoreReference { defer c.mux.Unlock() if c.semaphores[lockKey] == nil { - c.semaphores[lockKey] = &semaphoreReference{tokens: make(chan struct{}, c.maxPerKey)} + c.semaphores[lockKey] = &semaphoreReference{ + tokens: make(chan struct{}, c.maxPerKey), + newTicker: c.maxWaitTickerGetter, + } } c.semaphores[lockKey].count++ @@ -143,15 +170,16 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite } // NewLimiter creates a new rate limiter -func NewLimiter(perKeyLimit, globalLimit int, monitor ConcurrencyMonitor) *ConcurrencyLimiter { +func NewLimiter(perKeyLimit, globalLimit int, maxWaitTickerGetter QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { if monitor == nil { monitor = &nullConcurrencyMonitor{} } return &ConcurrencyLimiter{ - semaphores: make(map[string]*semaphoreReference), - maxPerKey: int64(perKeyLimit), - queuedLimit: int64(globalLimit), - monitor: monitor, + semaphores: make(map[string]*semaphoreReference), + maxPerKey: int64(perKeyLimit), + queuedLimit: int64(globalLimit), + monitor: monitor, + maxWaitTickerGetter: maxWaitTickerGetter, } } diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index 989f9478a..e581b17e2 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" ) @@ -138,7 +139,12 @@ func TestLimiter(t *testing.T) { gauge := &counter{} - limiter := NewLimiter(tt.maxConcurrency, 0, gauge) + limiter := NewLimiter( + tt.maxConcurrency, + 0, + nil, + gauge, + ) wg := sync.WaitGroup{} wg.Add(tt.concurrency) @@ -208,16 +214,16 @@ func TestLimiter(t *testing.T) { } } -type blockingCounter struct { +type blockingQueueCounter struct { counter - ch chan struct{} + queuedCh chan struct{} } // Queued will block on a channel. We need a way to synchronize on when a Limiter has attempted to acquire // a semaphore but has not yet. The caller can use the channel to wait for many requests to be queued -func (b *blockingCounter) Queued(_ context.Context) { - b.ch <- struct{}{} +func (b *blockingQueueCounter) Queued(_ context.Context) { + b.queuedCh <- struct{}{} } func TestConcurrencyLimiter_queueLimit(t *testing.T) { @@ -247,9 +253,9 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) { ) monitorCh := make(chan struct{}) - gauge := &blockingCounter{ch: monitorCh} + gauge := &blockingQueueCounter{queuedCh: monitorCh} ch := make(chan struct{}) - limiter := NewLimiter(1, queueLimit, gauge) + limiter := NewLimiter(1, queueLimit, nil, gauge) // occupied with one live request that takes a long time to complete go func() { @@ -308,3 +314,121 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) { }) } } + +type blockingDequeueCounter struct { + counter + + dequeuedCh chan struct{} +} + +// Dequeued will block on a channel. We need a way to synchronize on when a Limiter has successfully +// acquired a semaphore but has not yet. The caller can use the channel to wait for many requests to +// be queued +func (b *blockingDequeueCounter) Dequeued(context.Context) { + b.dequeuedCh <- struct{}{} +} + +func TestLimitConcurrency_queueWaitTime(t *testing.T) { + ctx := testhelper.Context(t) + + t.Run("with feature flag", func(t *testing.T) { + ctx = featureflag.IncomingCtxWithFeatureFlag( + ctx, + featureflag.ConcurrencyQueueMaxWait, + true, + ) + + ticker := helper.NewManualTicker() + + dequeuedCh := make(chan struct{}) + gauge := &blockingDequeueCounter{dequeuedCh: dequeuedCh} + limiter := NewLimiter( + 1, + 0, + func() helper.Ticker { + return ticker + }, + gauge, + ) + + ch := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + _, err := limiter.Limit(ctx, "key", func() (interface{}, error) { + <-ch + return nil, nil + }) + require.NoError(t, err) + wg.Done() + }() + + <-dequeuedCh + + ticker.Tick() + + errChan := make(chan error) + go func() { + _, err := limiter.Limit(ctx, "key", func() (interface{}, error) { + return nil, nil + }) + errChan <- err + }() + + <-dequeuedCh + err := <-errChan + + assert.Equal(t, ErrMaxQueueTime, err) + + close(ch) + wg.Wait() + }) + + t.Run("without feature flag", func(t *testing.T) { + ctx = featureflag.IncomingCtxWithFeatureFlag( + ctx, + featureflag.ConcurrencyQueueMaxWait, + false, + ) + + ticker := helper.NewManualTicker() + + dequeuedCh := make(chan struct{}) + gauge := &blockingDequeueCounter{dequeuedCh: dequeuedCh} + limiter := NewLimiter( + 1, + 0, + func() helper.Ticker { + return ticker + }, + gauge, + ) + + ch := make(chan struct{}) + go func() { + _, err := limiter.Limit(ctx, "key", func() (interface{}, error) { + <-ch + return nil, nil + }) + require.NoError(t, err) + }() + + <-dequeuedCh + + ticker.Tick() + + errChan := make(chan error) + go func() { + _, err := limiter.Limit(ctx, "key", func() (interface{}, error) { + return nil, nil + }) + errChan <- err + }() + + close(ch) + <-dequeuedCh + err := <-errChan + + assert.NoError(t, err) + }) +} diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go index 48ec48ca4..347ceccf8 100644 --- a/internal/middleware/limithandler/middleware.go +++ b/internal/middleware/limithandler/middleware.go @@ -7,6 +7,7 @@ import ( grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "google.golang.org/grpc" ) @@ -119,14 +120,37 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { func createLimiterConfig(middleware *LimiterMiddleware, cfg config.Cfg) map[string]*ConcurrencyLimiter { result := make(map[string]*ConcurrencyLimiter) + + newTickerFunc := func() helper.Ticker { + return helper.NewManualTicker() + } + for _, limit := range cfg.Concurrency { - result[limit.RPC] = NewLimiter(limit.MaxPerRepo, limit.MaxQueueSize, newPromMonitor(middleware, "gitaly", limit.RPC)) + if limit.MaxQueueWait > 0 { + limit := limit + newTickerFunc = func() helper.Ticker { + return helper.NewTimerTicker(limit.MaxQueueWait.Duration()) + } + } + + result[limit.RPC] = NewLimiter( + limit.MaxPerRepo, + limit.MaxQueueSize, + newTickerFunc, + newPromMonitor(middleware, "gitaly", limit.RPC), + ) } // Set default for ReplicateRepository. replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" if _, ok := result[replicateRepositoryFullMethod]; !ok { - result[replicateRepositoryFullMethod] = NewLimiter(1, 0, newPromMonitor(middleware, "gitaly", replicateRepositoryFullMethod)) + result[replicateRepositoryFullMethod] = NewLimiter( + 1, + 0, + func() helper.Ticker { + return helper.NewManualTicker() + }, + newPromMonitor(middleware, "gitaly", replicateRepositoryFullMethod)) } return result diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index a9ebfe62a..5375a8082 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -191,6 +191,9 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context { // ConcurrencyQueueEnforceMax is in the codepath of every RPC call since its in the limithandler // middleware. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.ConcurrencyQueueEnforceMax, true) + // ConcurrencyQueueMaxWait is in the codepath of every RPC call since it's in the limithandler + // middleware. + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.ConcurrencyQueueMaxWait, true) // We use hook directories everywhere, so it's infeasible to test this on a global // scale. Instead, we use it randomly. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.HooksInTempdir, mrand.Int()%2 == 0) |