From 38932ea95ce1a578762d45d40b967ec2271cf328 Mon Sep 17 00:00:00 2001 From: John Cai Date: Mon, 9 May 2022 18:46:13 -0400 Subject: limithandler: Fix flaky TestStreamLimitHandler The TestStreamLimitHandler test was relying on a hardcoded timeout. This caused flakiness in CI. We don't need to rely on a timeout however, since we have a concurrency queue limiter, we can utilize that to make this test synchronous. --- .../middleware/limithandler/middleware_test.go | 126 +++++++++++++++------ .../middleware/limithandler/testhelper_test.go | 7 ++ 2 files changed, 98 insertions(+), 35 deletions(-) diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go index 729b76626..7828c2ea7 100644 --- a/internal/middleware/limithandler/middleware_test.go +++ b/internal/middleware/limithandler/middleware_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata" @@ -81,49 +82,66 @@ func TestStreamLimitHandler(t *testing.T) { t.Parallel() testCases := []struct { - desc string - fullname string - f func(*testing.T, context.Context, pb.TestClient) - maxConcurrency int - expectedRequestCount int + desc string + fullname string + f func(*testing.T, context.Context, pb.TestClient, chan interface{}, chan error) + maxConcurrency int + expectedRequestCount int + expectedResponseCount int }{ + // The max queue size is set at 1, which means 1 request + // will be queued while the later ones will return with + // an error. That means that maxConcurrency number of + // requests will be processing but blocked due to blockCh. + // 1 request will be waiting to be picked up, and will be + // processed once we close the blockCh. { desc: "Single request, multiple responses", fullname: "/test.limithandler.Test/StreamOutput", - f: func(t *testing.T, ctx context.Context, client pb.TestClient) { + f: func(t *testing.T, ctx context.Context, client pb.TestClient, respCh chan interface{}, errCh chan error) { stream, err := client.StreamOutput(ctx, &pb.StreamOutputRequest{}) require.NoError(t, err) require.NotNil(t, stream) r, err := stream.Recv() - require.NoError(t, err) + if err != nil { + errCh <- err + return + } require.NotNil(t, r) require.True(t, r.Ok) + respCh <- r }, - maxConcurrency: 3, - expectedRequestCount: 3, + maxConcurrency: 3, + expectedRequestCount: 4, + expectedResponseCount: 4, }, { desc: "Multiple requests, single response", fullname: "/test.limithandler.Test/StreamInput", - f: func(t *testing.T, ctx context.Context, client pb.TestClient) { + f: func(t *testing.T, ctx context.Context, client pb.TestClient, respCh chan interface{}, errCh chan error) { stream, err := client.StreamInput(ctx) require.NoError(t, err) require.NotNil(t, stream) require.NoError(t, stream.Send(&pb.StreamInputRequest{})) r, err := stream.CloseAndRecv() - require.NoError(t, err) + if err != nil { + errCh <- err + return + } require.NotNil(t, r) require.True(t, r.Ok) + respCh <- r }, - maxConcurrency: 3, - expectedRequestCount: 3, + maxConcurrency: 3, + expectedRequestCount: 4, + expectedResponseCount: 4, }, { desc: "Multiple requests, multiple responses", fullname: "/test.limithandler.Test/Bidirectional", - f: func(t *testing.T, ctx context.Context, client pb.TestClient) { + f: func(t *testing.T, ctx context.Context, client pb.TestClient, respCh chan interface{}, errCh chan error) { stream, err := client.Bidirectional(ctx) require.NoError(t, err) require.NotNil(t, stream) @@ -132,19 +150,24 @@ func TestStreamLimitHandler(t *testing.T) { require.NoError(t, stream.CloseSend()) r, err := stream.Recv() - require.NoError(t, err) + if err != nil { + errCh <- err + return + } require.NotNil(t, r) require.True(t, r.Ok) + respCh <- r }, - maxConcurrency: 3, - expectedRequestCount: 3, + maxConcurrency: 3, + expectedRequestCount: 4, + expectedResponseCount: 4, }, { // Make sure that _streams_ are limited but that _requests_ on each // allowed stream are not limited. desc: "Multiple requests with same id, multiple responses", fullname: "/test.limithandler.Test/Bidirectional", - f: func(t *testing.T, ctx context.Context, client pb.TestClient) { + f: func(t *testing.T, ctx context.Context, client pb.TestClient, respCh chan interface{}, errCh chan error) { stream, err := client.Bidirectional(ctx) require.NoError(t, err) require.NotNil(t, stream) @@ -158,29 +181,40 @@ func TestStreamLimitHandler(t *testing.T) { require.NoError(t, stream.CloseSend()) r, err := stream.Recv() - require.NoError(t, err) + if err != nil { + errCh <- err + return + } require.NotNil(t, r) require.True(t, r.Ok) + respCh <- r }, maxConcurrency: 3, // 3 (concurrent streams allowed) * 10 (requests per stream) - expectedRequestCount: 30, + // + 1 (queued stream) * (10 requests per stream) + expectedRequestCount: 40, + expectedResponseCount: 4, }, { desc: "With a max concurrency of 0", fullname: "/test.limithandler.Test/StreamOutput", - f: func(t *testing.T, ctx context.Context, client pb.TestClient) { + f: func(t *testing.T, ctx context.Context, client pb.TestClient, respCh chan interface{}, errCh chan error) { stream, err := client.StreamOutput(ctx, &pb.StreamOutputRequest{}) require.NoError(t, err) require.NotNil(t, stream) r, err := stream.Recv() - require.NoError(t, err) + if err != nil { + errCh <- err + return + } require.NotNil(t, r) require.True(t, r.Ok) + respCh <- r }, - maxConcurrency: 0, - expectedRequestCount: 10, // Allow all + maxConcurrency: 0, + expectedRequestCount: 10, + expectedResponseCount: 10, }, } @@ -190,9 +224,14 @@ func TestStreamLimitHandler(t *testing.T) { s := &server{blockCh: make(chan struct{})} + maxQueueSize := 1 cfg := config.Cfg{ Concurrency: []config.Concurrency{ - {RPC: tc.fullname, MaxPerRepo: tc.maxConcurrency}, + { + RPC: tc.fullname, + MaxPerRepo: tc.maxConcurrency, + MaxQueueSize: maxQueueSize, + }, }, } @@ -203,23 +242,40 @@ func TestStreamLimitHandler(t *testing.T) { client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := testhelper.Context(t) + ctx := featureflag.IncomingCtxWithFeatureFlag( + testhelper.Context(t), + featureflag.ConcurrencyQueueEnforceMax, + true, + ) - wg := &sync.WaitGroup{} - for i := 0; i < 10; i++ { - wg.Add(1) + totalCalls := 10 + + errChan := make(chan error) + respChan := make(chan interface{}) + + for i := 0; i < totalCalls; i++ { go func() { - defer wg.Done() - tc.f(t, ctx, client) + tc.f(t, ctx, client, respChan, errChan) }() } - time.Sleep(100 * time.Millisecond) - - require.Equal(t, tc.expectedRequestCount, s.getRequestCount()) + if tc.maxConcurrency > 0 { + for i := 0; i < totalCalls-tc.maxConcurrency-maxQueueSize; i++ { + err := <-errChan + testhelper.RequireGrpcError( + t, + helper.ErrInternalf("rate limiting stream request: %w", + limithandler.ErrMaxQueueSize), err) + } + } close(s.blockCh) - wg.Wait() + + for i := 0; i < tc.expectedResponseCount; i++ { + <-respChan + } + + require.Equal(t, tc.expectedRequestCount, s.getRequestCount()) }) } } diff --git a/internal/middleware/limithandler/testhelper_test.go b/internal/middleware/limithandler/testhelper_test.go index 7ae5c7046..ce533d710 100644 --- a/internal/middleware/limithandler/testhelper_test.go +++ b/internal/middleware/limithandler/testhelper_test.go @@ -2,6 +2,7 @@ package limithandler_test import ( "context" + "io" "sync/atomic" pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata" @@ -41,6 +42,9 @@ func (s *server) StreamInput(stream pb.Test_StreamInputServer) error { // Read all the input for { if _, err := stream.Recv(); err != nil { + if err != io.EOF { + return err + } break } @@ -56,6 +60,9 @@ func (s *server) Bidirectional(stream pb.Test_BidirectionalServer) error { // Read all the input for { if _, err := stream.Recv(); err != nil { + if err != io.EOF { + return err + } break } -- cgit v1.2.3