diff options
author | John Cai <jcai@gitlab.com> | 2022-06-13 21:04:10 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-06-13 21:04:10 +0300 |
commit | bb1723cdb97fbb14737e382e4930fabf33af79cf (patch) | |
tree | 4a325ec919cfac41a4f507f7460b99274b17e5f1 | |
parent | 4e84eaf02f3a5403ef44c3f51b8e46c57e24ed02 (diff) |
featureflag: Remove concurrency_queue_enforce_max feature flagjc-remove-max-queue-size-ff
Now that the concurrency_queue_enforce_max has been running in staging
and production without issue, we can remove it.
-rw-r--r-- | internal/metadata/featureflag/ff_queue_max.go | 5 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 4 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter_test.go | 145 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware_test.go | 18 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 3 |
5 files changed, 61 insertions, 114 deletions
diff --git a/internal/metadata/featureflag/ff_queue_max.go b/internal/metadata/featureflag/ff_queue_max.go deleted file mode 100644 index 4b92a4f0f..000000000 --- a/internal/metadata/featureflag/ff_queue_max.go +++ /dev/null @@ -1,5 +0,0 @@ -package featureflag - -// ConcurrencyQueueEnforceMax enforces a maximum number of items that are waiting in a concurrency queue. -// when this flag is turned on, subsequent requests that come in will be rejected with an error. -var ConcurrencyQueueEnforceMax = NewFeatureFlag("concurrency_queue_enforce_max", false) diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 914bfdae8..e2595571b 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -11,7 +11,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" - "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "google.golang.org/protobuf/types/known/durationpb" ) @@ -118,8 +117,7 @@ func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error { c.mux.Lock() defer c.mux.Unlock() - if featureflag.ConcurrencyQueueEnforceMax.IsEnabled(ctx) && - c.queuedLimit > 0 && + if c.queuedLimit > 0 && c.queued >= c.queuedLimit { c.monitor.Dropped(ctx, "max_size") return ErrMaxQueueSize diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index 718f3b622..417926907 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" - "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "google.golang.org/grpc/status" @@ -244,100 +243,70 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) { queueLimit := 10 ctx := testhelper.Context(t) - testCases := []struct { - desc string - featureFlagOn bool - }{ - { - desc: "feature flag on", - featureFlagOn: true, - }, - { - desc: "feature flag off", - featureFlagOn: false, - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - ctx = featureflag.IncomingCtxWithFeatureFlag( - ctx, - featureflag.ConcurrencyQueueEnforceMax, - tc.featureFlagOn, - ) + monitorCh := make(chan struct{}) + monitor := &blockingQueueCounter{queuedCh: monitorCh} + ch := make(chan struct{}) + limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor) - monitorCh := make(chan struct{}) - monitor := &blockingQueueCounter{queuedCh: monitorCh} - ch := make(chan struct{}) - limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor) - - // occupied with one live request that takes a long time to complete - go func() { - _, err := limiter.Limit(ctx, "key", func() (interface{}, error) { - ch <- struct{}{} - <-ch - return nil, nil - }) - require.NoError(t, err) - }() - - <-monitorCh + // occupied with one live request that takes a long time to complete + go func() { + _, err := limiter.Limit(ctx, "key", func() (interface{}, error) { + ch <- struct{}{} <-ch + return nil, nil + }) + require.NoError(t, err) + }() - var wg sync.WaitGroup - // fill up the queue - for i := 0; i < queueLimit; i++ { - wg.Add(1) - go func() { - _, err := limiter.Limit(ctx, "key", func() (interface{}, error) { - return nil, nil - }) - require.NoError(t, err) - wg.Done() - }() - } + <-monitorCh + <-ch - var queued int - for range monitorCh { - queued++ - if queued == queueLimit { - break - } - } + var wg sync.WaitGroup + // fill up the queue + for i := 0; i < queueLimit; i++ { + wg.Add(1) + go func() { + _, err := limiter.Limit(ctx, "key", func() (interface{}, error) { + return nil, nil + }) + require.NoError(t, err) + wg.Done() + }() + } - errChan := make(chan error, 1) - go func() { - _, err := limiter.Limit(ctx, "key", func() (interface{}, error) { - return nil, nil - }) - errChan <- err - }() - - if tc.featureFlagOn { - err := <-errChan - assert.Error(t, err) - - s, ok := status.FromError(err) - require.True(t, ok) - details := s.Details() - require.Len(t, details, 1) - - limitErr, ok := details[0].(*gitalypb.LimitError) - require.True(t, ok) - - assert.Equal(t, ErrMaxQueueSize.Error(), limitErr.ErrorMessage) - assert.Equal(t, durationpb.New(0), limitErr.RetryAfter) - assert.Equal(t, monitor.droppedSize, 1) - } else { - <-monitorCh - assert.Equal(t, int64(queueLimit+1), limiter.queued) - assert.Equal(t, monitor.droppedSize, 0) - } + var queued int + for range monitorCh { + queued++ + if queued == queueLimit { + break + } + } - close(ch) - wg.Wait() + errChan := make(chan error, 1) + go func() { + _, err := limiter.Limit(ctx, "key", func() (interface{}, error) { + return nil, nil }) - } + errChan <- err + }() + + err := <-errChan + assert.Error(t, err) + + s, ok := status.FromError(err) + require.True(t, ok) + details := s.Details() + require.Len(t, details, 1) + + limitErr, ok := details[0].(*gitalypb.LimitError) + require.True(t, ok) + + assert.Equal(t, ErrMaxQueueSize.Error(), limitErr.ErrorMessage) + assert.Equal(t, durationpb.New(0), limitErr.RetryAfter) + assert.Equal(t, monitor.droppedSize, 1) + + close(ch) + wg.Wait() } type blockingDequeueCounter struct { diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go index ecd26f942..52738cc50 100644 --- a/internal/middleware/limithandler/middleware_test.go +++ b/internal/middleware/limithandler/middleware_test.go @@ -242,11 +242,7 @@ func TestStreamLimitHandler(t *testing.T) { client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := featureflag.IncomingCtxWithFeatureFlag( - testhelper.Context(t), - featureflag.ConcurrencyQueueEnforceMax, - true, - ) + ctx := testhelper.Context(t) totalCalls := 10 @@ -300,11 +296,7 @@ func TestStreamLimitHandler_error(t *testing.T) { client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := featureflag.IncomingCtxWithFeatureFlag( - testhelper.Context(t), - featureflag.ConcurrencyQueueEnforceMax, - true, - ) + ctx := testhelper.Context(t) respChan := make(chan *pb.BidirectionalResponse) go func() { @@ -416,11 +408,7 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := featureflag.IncomingCtxWithFeatureFlag( - testhelper.Context(t), - featureflag.ConcurrencyQueueEnforceMax, - true, - ) + ctx := testhelper.Context(t) respCh := make(chan *pb.UnaryResponse) go func() { diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index 15e6a916e..db65feeb3 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -173,9 +173,6 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context { // deep in the call stack, so almost every test function would have to inject it into its // context. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.RunCommandsInCGroup, true) - // ConcurrencyQueueEnforceMax is in the codepath of every RPC call since its in the limithandler - // middleware. - ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.ConcurrencyQueueEnforceMax, true) // Randomly inject the Git flag so that we have coverage of tests with both old and new Git // version by pure chance. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.GitV2361Gl1, rnd.Int()%2 == 0) |