Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-06-13 21:04:10 +0300
committerJohn Cai <jcai@gitlab.com>2022-06-13 21:04:10 +0300
commitbb1723cdb97fbb14737e382e4930fabf33af79cf (patch)
tree4a325ec919cfac41a4f507f7460b99274b17e5f1
parent4e84eaf02f3a5403ef44c3f51b8e46c57e24ed02 (diff)
featureflag: Remove concurrency_queue_enforce_max feature flagjc-remove-max-queue-size-ff
Now that the concurrency_queue_enforce_max has been running in staging and production without issue, we can remove it.
-rw-r--r--internal/metadata/featureflag/ff_queue_max.go5
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go4
-rw-r--r--internal/middleware/limithandler/concurrency_limiter_test.go145
-rw-r--r--internal/middleware/limithandler/middleware_test.go18
-rw-r--r--internal/testhelper/testhelper.go3
5 files changed, 61 insertions, 114 deletions
diff --git a/internal/metadata/featureflag/ff_queue_max.go b/internal/metadata/featureflag/ff_queue_max.go
deleted file mode 100644
index 4b92a4f0f..000000000
--- a/internal/metadata/featureflag/ff_queue_max.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package featureflag
-
-// ConcurrencyQueueEnforceMax enforces a maximum number of items that are waiting in a concurrency queue.
-// when this flag is turned on, subsequent requests that come in will be rejected with an error.
-var ConcurrencyQueueEnforceMax = NewFeatureFlag("concurrency_queue_enforce_max", false)
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index 914bfdae8..e2595571b 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -11,7 +11,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/protobuf/types/known/durationpb"
)
@@ -118,8 +117,7 @@ func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error {
c.mux.Lock()
defer c.mux.Unlock()
- if featureflag.ConcurrencyQueueEnforceMax.IsEnabled(ctx) &&
- c.queuedLimit > 0 &&
+ if c.queuedLimit > 0 &&
c.queued >= c.queuedLimit {
c.monitor.Dropped(ctx, "max_size")
return ErrMaxQueueSize
diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go
index 718f3b622..417926907 100644
--- a/internal/middleware/limithandler/concurrency_limiter_test.go
+++ b/internal/middleware/limithandler/concurrency_limiter_test.go
@@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/grpc/status"
@@ -244,100 +243,70 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
queueLimit := 10
ctx := testhelper.Context(t)
- testCases := []struct {
- desc string
- featureFlagOn bool
- }{
- {
- desc: "feature flag on",
- featureFlagOn: true,
- },
- {
- desc: "feature flag off",
- featureFlagOn: false,
- },
- }
-
- for _, tc := range testCases {
- t.Run(tc.desc, func(t *testing.T) {
- ctx = featureflag.IncomingCtxWithFeatureFlag(
- ctx,
- featureflag.ConcurrencyQueueEnforceMax,
- tc.featureFlagOn,
- )
+ monitorCh := make(chan struct{})
+ monitor := &blockingQueueCounter{queuedCh: monitorCh}
+ ch := make(chan struct{})
+ limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor)
- monitorCh := make(chan struct{})
- monitor := &blockingQueueCounter{queuedCh: monitorCh}
- ch := make(chan struct{})
- limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor)
-
- // occupied with one live request that takes a long time to complete
- go func() {
- _, err := limiter.Limit(ctx, "key", func() (interface{}, error) {
- ch <- struct{}{}
- <-ch
- return nil, nil
- })
- require.NoError(t, err)
- }()
-
- <-monitorCh
+ // occupied with one live request that takes a long time to complete
+ go func() {
+ _, err := limiter.Limit(ctx, "key", func() (interface{}, error) {
+ ch <- struct{}{}
<-ch
+ return nil, nil
+ })
+ require.NoError(t, err)
+ }()
- var wg sync.WaitGroup
- // fill up the queue
- for i := 0; i < queueLimit; i++ {
- wg.Add(1)
- go func() {
- _, err := limiter.Limit(ctx, "key", func() (interface{}, error) {
- return nil, nil
- })
- require.NoError(t, err)
- wg.Done()
- }()
- }
+ <-monitorCh
+ <-ch
- var queued int
- for range monitorCh {
- queued++
- if queued == queueLimit {
- break
- }
- }
+ var wg sync.WaitGroup
+ // fill up the queue
+ for i := 0; i < queueLimit; i++ {
+ wg.Add(1)
+ go func() {
+ _, err := limiter.Limit(ctx, "key", func() (interface{}, error) {
+ return nil, nil
+ })
+ require.NoError(t, err)
+ wg.Done()
+ }()
+ }
- errChan := make(chan error, 1)
- go func() {
- _, err := limiter.Limit(ctx, "key", func() (interface{}, error) {
- return nil, nil
- })
- errChan <- err
- }()
-
- if tc.featureFlagOn {
- err := <-errChan
- assert.Error(t, err)
-
- s, ok := status.FromError(err)
- require.True(t, ok)
- details := s.Details()
- require.Len(t, details, 1)
-
- limitErr, ok := details[0].(*gitalypb.LimitError)
- require.True(t, ok)
-
- assert.Equal(t, ErrMaxQueueSize.Error(), limitErr.ErrorMessage)
- assert.Equal(t, durationpb.New(0), limitErr.RetryAfter)
- assert.Equal(t, monitor.droppedSize, 1)
- } else {
- <-monitorCh
- assert.Equal(t, int64(queueLimit+1), limiter.queued)
- assert.Equal(t, monitor.droppedSize, 0)
- }
+ var queued int
+ for range monitorCh {
+ queued++
+ if queued == queueLimit {
+ break
+ }
+ }
- close(ch)
- wg.Wait()
+ errChan := make(chan error, 1)
+ go func() {
+ _, err := limiter.Limit(ctx, "key", func() (interface{}, error) {
+ return nil, nil
})
- }
+ errChan <- err
+ }()
+
+ err := <-errChan
+ assert.Error(t, err)
+
+ s, ok := status.FromError(err)
+ require.True(t, ok)
+ details := s.Details()
+ require.Len(t, details, 1)
+
+ limitErr, ok := details[0].(*gitalypb.LimitError)
+ require.True(t, ok)
+
+ assert.Equal(t, ErrMaxQueueSize.Error(), limitErr.ErrorMessage)
+ assert.Equal(t, durationpb.New(0), limitErr.RetryAfter)
+ assert.Equal(t, monitor.droppedSize, 1)
+
+ close(ch)
+ wg.Wait()
}
type blockingDequeueCounter struct {
diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go
index ecd26f942..52738cc50 100644
--- a/internal/middleware/limithandler/middleware_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -242,11 +242,7 @@ func TestStreamLimitHandler(t *testing.T) {
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := featureflag.IncomingCtxWithFeatureFlag(
- testhelper.Context(t),
- featureflag.ConcurrencyQueueEnforceMax,
- true,
- )
+ ctx := testhelper.Context(t)
totalCalls := 10
@@ -300,11 +296,7 @@ func TestStreamLimitHandler_error(t *testing.T) {
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := featureflag.IncomingCtxWithFeatureFlag(
- testhelper.Context(t),
- featureflag.ConcurrencyQueueEnforceMax,
- true,
- )
+ ctx := testhelper.Context(t)
respChan := make(chan *pb.BidirectionalResponse)
go func() {
@@ -416,11 +408,7 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := featureflag.IncomingCtxWithFeatureFlag(
- testhelper.Context(t),
- featureflag.ConcurrencyQueueEnforceMax,
- true,
- )
+ ctx := testhelper.Context(t)
respCh := make(chan *pb.UnaryResponse)
go func() {
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index 15e6a916e..db65feeb3 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -173,9 +173,6 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context {
// deep in the call stack, so almost every test function would have to inject it into its
// context.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.RunCommandsInCGroup, true)
- // ConcurrencyQueueEnforceMax is in the codepath of every RPC call since its in the limithandler
- // middleware.
- ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.ConcurrencyQueueEnforceMax, true)
// Randomly inject the Git flag so that we have coverage of tests with both old and new Git
// version by pure chance.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.GitV2361Gl1, rnd.Int()%2 == 0)