diff options
author | John Cai <jcai@gitlab.com> | 2022-04-29 17:51:19 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-04-29 17:51:19 +0300 |
commit | c47bb5beba81703b7c2bb2b3d1c138cb667fa80c (patch) | |
tree | 5935a10dc5e554c0fc5462e14842e189ac63a619 | |
parent | b3a0b630bd33b6f23902203d5443c7518b316b41 (diff) | |
parent | de291225acc1244454075466cb0986e5bf7072fa (diff) |
Merge branch 'jc-fix-limiter-streaming' into 'master'
limithandler: Return error from limit handler
See merge request gitlab-org/gitaly!4492
-rw-r--r-- | internal/middleware/limithandler/middleware.go | 7 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware_test.go | 101 |
2 files changed, 106 insertions, 2 deletions
diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go index 0d4ff6bbc..7ee2aa37f 100644 --- a/internal/middleware/limithandler/middleware.go +++ b/internal/middleware/limithandler/middleware.go @@ -3,10 +3,10 @@ package limithandler import ( "context" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "google.golang.org/grpc" ) @@ -146,13 +146,14 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { } ready := make(chan struct{}) + errs := make(chan error) go func() { if _, err := limiter.Limit(ctx, lockKey, func() (interface{}, error) { close(ready) <-ctx.Done() return nil, nil }); err != nil { - ctxlogrus.Extract(ctx).WithError(err).Error("rate limiting streaming request") + errs <- err } }() @@ -162,5 +163,7 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { case <-ready: // It's our turn! return nil + case err := <-errs: + return helper.ErrInternalf("rate limiting stream request: %v", err) } } diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go index 53f0f53f2..bb953f616 100644 --- a/internal/middleware/limithandler/middleware_test.go +++ b/internal/middleware/limithandler/middleware_test.go @@ -3,6 +3,7 @@ package limithandler_test import ( "bytes" "context" + "io" "net" "sync" "testing" @@ -12,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata" @@ -221,6 +223,86 @@ func TestStreamLimitHandler(t *testing.T) { } } +func TestStreamLimitHandler_error(t *testing.T) { + t.Parallel() + + s := &queueTestServer{reqArrivedCh: make(chan struct{})} + s.blockCh = make(chan struct{}) + + cfg := config.Cfg{ + Concurrency: []config.Concurrency{ + {RPC: "/test.limithandler.Test/Bidirectional", MaxPerRepo: 1, MaxQueueSize: 1}, + }, + } + + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + interceptor := lh.StreamInterceptor() + srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) + defer srv.Stop() + + client, conn := newClient(t, serverSocketPath) + defer conn.Close() + + ctx := featureflag.IncomingCtxWithFeatureFlag( + testhelper.Context(t), + featureflag.ConcurrencyQueueEnforceMax, + true, + ) + + respChan := make(chan *pb.BidirectionalResponse) + go func() { + stream, err := client.Bidirectional(ctx) + require.NoError(t, err) + require.NoError(t, stream.Send(&pb.BidirectionalRequest{})) + require.NoError(t, stream.CloseSend()) + resp, err := stream.Recv() + require.NoError(t, err) + respChan <- resp + }() + // The first request will be blocked by blockCh. + <-s.reqArrivedCh + + // These are the second and third requests to be sent. + // The second request will be waiting in the queue. + // The third request should return with an error. + errChan := make(chan error) + for i := 0; i < 2; i++ { + go func() { + stream, err := client.Bidirectional(ctx) + require.NoError(t, err) + require.NotNil(t, stream) + require.NoError(t, stream.Send(&pb.BidirectionalRequest{})) + require.NoError(t, stream.CloseSend()) + resp, err := stream.Recv() + + if err != nil { + errChan <- err + } else { + respChan <- resp + } + }() + } + + err := <-errChan + testhelper.RequireGrpcError( + t, + helper.ErrInternalf("rate limiting stream request: %w", + limithandler.ErrMaxQueueSize), + err) + + // allow the first request to finish + close(s.blockCh) + + // This allows the second request to finish + <-s.reqArrivedCh + + // we expect two responses. The first request, and the second + // request. The third request returned immediately with an error + // from the limit handler. + <-respChan + <-respChan +} + type queueTestServer struct { server reqArrivedCh chan struct{} @@ -235,6 +317,25 @@ func (q *queueTestServer) Unary(ctx context.Context, in *pb.UnaryRequest) (*pb.U return &pb.UnaryResponse{Ok: true}, nil } +func (q *queueTestServer) Bidirectional(stream pb.Test_BidirectionalServer) error { + // Read all the input + for { + if _, err := stream.Recv(); err != nil { + if err != io.EOF { + return err + } + + break + } + q.reqArrivedCh <- struct{}{} + + q.registerRequest() + } + <-q.blockCh // Block to ensure concurrency + + return stream.Send(&pb.BidirectionalResponse{Ok: true}) +} + func TestConcurrencyLimitHandlerMetrics(t *testing.T) { s := &queueTestServer{reqArrivedCh: make(chan struct{})} s.blockCh = make(chan struct{}) |