Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-05-10 01:46:13 +0300
committerJohn Cai <jcai@gitlab.com>2022-05-10 02:55:40 +0300
commit38932ea95ce1a578762d45d40b967ec2271cf328 (patch)
tree9611c3d326c3969d05e8c28571c529619ff506fa
parent8940011420764b977984d122b0a63871627806c4 (diff)
limithandler: Fix flaky TestStreamLimitHandlerjc-fix-limithandler-flake
The TestStreamLimitHandler test was relying on a hardcoded timeout. This caused flakiness in CI. We don't need to rely on a timeout however, since we have a concurrency queue limiter, we can utilize that to make this test synchronous.
-rw-r--r--internal/middleware/limithandler/middleware_test.go126
-rw-r--r--internal/middleware/limithandler/testhelper_test.go7
2 files changed, 98 insertions, 35 deletions
diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go
index 729b76626..7828c2ea7 100644
--- a/internal/middleware/limithandler/middleware_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata"
@@ -81,49 +82,66 @@ func TestStreamLimitHandler(t *testing.T) {
t.Parallel()
testCases := []struct {
- desc string
- fullname string
- f func(*testing.T, context.Context, pb.TestClient)
- maxConcurrency int
- expectedRequestCount int
+ desc string
+ fullname string
+ f func(*testing.T, context.Context, pb.TestClient, chan interface{}, chan error)
+ maxConcurrency int
+ expectedRequestCount int
+ expectedResponseCount int
}{
+ // The max queue size is set at 1, which means 1 request
+ // will be queued while the later ones will return with
+ // an error. That means that maxConcurrency number of
+ // requests will be processing but blocked due to blockCh.
+ // 1 request will be waiting to be picked up, and will be
+ // processed once we close the blockCh.
{
desc: "Single request, multiple responses",
fullname: "/test.limithandler.Test/StreamOutput",
- f: func(t *testing.T, ctx context.Context, client pb.TestClient) {
+ f: func(t *testing.T, ctx context.Context, client pb.TestClient, respCh chan interface{}, errCh chan error) {
stream, err := client.StreamOutput(ctx, &pb.StreamOutputRequest{})
require.NoError(t, err)
require.NotNil(t, stream)
r, err := stream.Recv()
- require.NoError(t, err)
+ if err != nil {
+ errCh <- err
+ return
+ }
require.NotNil(t, r)
require.True(t, r.Ok)
+ respCh <- r
},
- maxConcurrency: 3,
- expectedRequestCount: 3,
+ maxConcurrency: 3,
+ expectedRequestCount: 4,
+ expectedResponseCount: 4,
},
{
desc: "Multiple requests, single response",
fullname: "/test.limithandler.Test/StreamInput",
- f: func(t *testing.T, ctx context.Context, client pb.TestClient) {
+ f: func(t *testing.T, ctx context.Context, client pb.TestClient, respCh chan interface{}, errCh chan error) {
stream, err := client.StreamInput(ctx)
require.NoError(t, err)
require.NotNil(t, stream)
require.NoError(t, stream.Send(&pb.StreamInputRequest{}))
r, err := stream.CloseAndRecv()
- require.NoError(t, err)
+ if err != nil {
+ errCh <- err
+ return
+ }
require.NotNil(t, r)
require.True(t, r.Ok)
+ respCh <- r
},
- maxConcurrency: 3,
- expectedRequestCount: 3,
+ maxConcurrency: 3,
+ expectedRequestCount: 4,
+ expectedResponseCount: 4,
},
{
desc: "Multiple requests, multiple responses",
fullname: "/test.limithandler.Test/Bidirectional",
- f: func(t *testing.T, ctx context.Context, client pb.TestClient) {
+ f: func(t *testing.T, ctx context.Context, client pb.TestClient, respCh chan interface{}, errCh chan error) {
stream, err := client.Bidirectional(ctx)
require.NoError(t, err)
require.NotNil(t, stream)
@@ -132,19 +150,24 @@ func TestStreamLimitHandler(t *testing.T) {
require.NoError(t, stream.CloseSend())
r, err := stream.Recv()
- require.NoError(t, err)
+ if err != nil {
+ errCh <- err
+ return
+ }
require.NotNil(t, r)
require.True(t, r.Ok)
+ respCh <- r
},
- maxConcurrency: 3,
- expectedRequestCount: 3,
+ maxConcurrency: 3,
+ expectedRequestCount: 4,
+ expectedResponseCount: 4,
},
{
// Make sure that _streams_ are limited but that _requests_ on each
// allowed stream are not limited.
desc: "Multiple requests with same id, multiple responses",
fullname: "/test.limithandler.Test/Bidirectional",
- f: func(t *testing.T, ctx context.Context, client pb.TestClient) {
+ f: func(t *testing.T, ctx context.Context, client pb.TestClient, respCh chan interface{}, errCh chan error) {
stream, err := client.Bidirectional(ctx)
require.NoError(t, err)
require.NotNil(t, stream)
@@ -158,29 +181,40 @@ func TestStreamLimitHandler(t *testing.T) {
require.NoError(t, stream.CloseSend())
r, err := stream.Recv()
- require.NoError(t, err)
+ if err != nil {
+ errCh <- err
+ return
+ }
require.NotNil(t, r)
require.True(t, r.Ok)
+ respCh <- r
},
maxConcurrency: 3,
// 3 (concurrent streams allowed) * 10 (requests per stream)
- expectedRequestCount: 30,
+ // + 1 (queued stream) * (10 requests per stream)
+ expectedRequestCount: 40,
+ expectedResponseCount: 4,
},
{
desc: "With a max concurrency of 0",
fullname: "/test.limithandler.Test/StreamOutput",
- f: func(t *testing.T, ctx context.Context, client pb.TestClient) {
+ f: func(t *testing.T, ctx context.Context, client pb.TestClient, respCh chan interface{}, errCh chan error) {
stream, err := client.StreamOutput(ctx, &pb.StreamOutputRequest{})
require.NoError(t, err)
require.NotNil(t, stream)
r, err := stream.Recv()
- require.NoError(t, err)
+ if err != nil {
+ errCh <- err
+ return
+ }
require.NotNil(t, r)
require.True(t, r.Ok)
+ respCh <- r
},
- maxConcurrency: 0,
- expectedRequestCount: 10, // Allow all
+ maxConcurrency: 0,
+ expectedRequestCount: 10,
+ expectedResponseCount: 10,
},
}
@@ -190,9 +224,14 @@ func TestStreamLimitHandler(t *testing.T) {
s := &server{blockCh: make(chan struct{})}
+ maxQueueSize := 1
cfg := config.Cfg{
Concurrency: []config.Concurrency{
- {RPC: tc.fullname, MaxPerRepo: tc.maxConcurrency},
+ {
+ RPC: tc.fullname,
+ MaxPerRepo: tc.maxConcurrency,
+ MaxQueueSize: maxQueueSize,
+ },
},
}
@@ -203,23 +242,40 @@ func TestStreamLimitHandler(t *testing.T) {
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := testhelper.Context(t)
+ ctx := featureflag.IncomingCtxWithFeatureFlag(
+ testhelper.Context(t),
+ featureflag.ConcurrencyQueueEnforceMax,
+ true,
+ )
- wg := &sync.WaitGroup{}
- for i := 0; i < 10; i++ {
- wg.Add(1)
+ totalCalls := 10
+
+ errChan := make(chan error)
+ respChan := make(chan interface{})
+
+ for i := 0; i < totalCalls; i++ {
go func() {
- defer wg.Done()
- tc.f(t, ctx, client)
+ tc.f(t, ctx, client, respChan, errChan)
}()
}
- time.Sleep(100 * time.Millisecond)
-
- require.Equal(t, tc.expectedRequestCount, s.getRequestCount())
+ if tc.maxConcurrency > 0 {
+ for i := 0; i < totalCalls-tc.maxConcurrency-maxQueueSize; i++ {
+ err := <-errChan
+ testhelper.RequireGrpcError(
+ t,
+ helper.ErrInternalf("rate limiting stream request: %w",
+ limithandler.ErrMaxQueueSize), err)
+ }
+ }
close(s.blockCh)
- wg.Wait()
+
+ for i := 0; i < tc.expectedResponseCount; i++ {
+ <-respChan
+ }
+
+ require.Equal(t, tc.expectedRequestCount, s.getRequestCount())
})
}
}
diff --git a/internal/middleware/limithandler/testhelper_test.go b/internal/middleware/limithandler/testhelper_test.go
index 7ae5c7046..ce533d710 100644
--- a/internal/middleware/limithandler/testhelper_test.go
+++ b/internal/middleware/limithandler/testhelper_test.go
@@ -2,6 +2,7 @@ package limithandler_test
import (
"context"
+ "io"
"sync/atomic"
pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata"
@@ -41,6 +42,9 @@ func (s *server) StreamInput(stream pb.Test_StreamInputServer) error {
// Read all the input
for {
if _, err := stream.Recv(); err != nil {
+ if err != io.EOF {
+ return err
+ }
break
}
@@ -56,6 +60,9 @@ func (s *server) Bidirectional(stream pb.Test_BidirectionalServer) error {
// Read all the input
for {
if _, err := stream.Recv(); err != nil {
+ if err != io.EOF {
+ return err
+ }
break
}