diff options
author | John Cai <jcai@gitlab.com> | 2022-04-23 00:43:45 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-04-28 04:58:01 +0300 |
commit | 2dcb7931814eca9f2ecab20bcf235d51ccf3a978 (patch) | |
tree | 09fd4fe3a4447d607a64cba832a917dc521e1b90 | |
parent | 10d5f9bbf09a1405185519e34756925e8cbcc19f (diff) |
limithandler: Return error from limit handlerjc-fix-limiter-streaming
In c83f00059 (add concurrency queue limit 2022-01-24), we modified the
limit handler to return immediately with an error if certain limit
conditions are reached.
In the wrappedStream however, RecvMsg() merely logs the error. This will
block indefinitely, because when the limiter returns with an error, it
will no longer try to get a concurrency slot. Thus this function will
wait until the context is cancelled.
Fix this by returning an error when we get an error from the limiter.
Changelog: fixed
-rw-r--r-- | internal/middleware/limithandler/middleware.go | 5 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware_test.go | 101 |
2 files changed, 106 insertions, 0 deletions
diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go index 0d4ff6bbc..3f8bfeeeb 100644 --- a/internal/middleware/limithandler/middleware.go +++ b/internal/middleware/limithandler/middleware.go @@ -7,6 +7,7 @@ import ( grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "google.golang.org/grpc" ) @@ -146,6 +147,7 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { } ready := make(chan struct{}) + errs := make(chan error) go func() { if _, err := limiter.Limit(ctx, lockKey, func() (interface{}, error) { close(ready) @@ -153,6 +155,7 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { return nil, nil }); err != nil { ctxlogrus.Extract(ctx).WithError(err).Error("rate limiting streaming request") + errs <- err } }() @@ -162,5 +165,7 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { case <-ready: // It's our turn! return nil + case err := <-errs: + return helper.ErrInternalf("rate limiting stream request: %v", err) } } diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go index 53f0f53f2..bb953f616 100644 --- a/internal/middleware/limithandler/middleware_test.go +++ b/internal/middleware/limithandler/middleware_test.go @@ -3,6 +3,7 @@ package limithandler_test import ( "bytes" "context" + "io" "net" "sync" "testing" @@ -12,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata" @@ -221,6 +223,86 @@ func TestStreamLimitHandler(t *testing.T) { } } +func TestStreamLimitHandler_error(t *testing.T) { + t.Parallel() + + s := &queueTestServer{reqArrivedCh: make(chan struct{})} + s.blockCh = make(chan struct{}) + + cfg := config.Cfg{ + Concurrency: []config.Concurrency{ + {RPC: "/test.limithandler.Test/Bidirectional", MaxPerRepo: 1, MaxQueueSize: 1}, + }, + } + + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + interceptor := lh.StreamInterceptor() + srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) + defer srv.Stop() + + client, conn := newClient(t, serverSocketPath) + defer conn.Close() + + ctx := featureflag.IncomingCtxWithFeatureFlag( + testhelper.Context(t), + featureflag.ConcurrencyQueueEnforceMax, + true, + ) + + respChan := make(chan *pb.BidirectionalResponse) + go func() { + stream, err := client.Bidirectional(ctx) + require.NoError(t, err) + require.NoError(t, stream.Send(&pb.BidirectionalRequest{})) + require.NoError(t, stream.CloseSend()) + resp, err := stream.Recv() + require.NoError(t, err) + respChan <- resp + }() + // The first request will be blocked by blockCh. + <-s.reqArrivedCh + + // These are the second and third requests to be sent. + // The second request will be waiting in the queue. + // The third request should return with an error. + errChan := make(chan error) + for i := 0; i < 2; i++ { + go func() { + stream, err := client.Bidirectional(ctx) + require.NoError(t, err) + require.NotNil(t, stream) + require.NoError(t, stream.Send(&pb.BidirectionalRequest{})) + require.NoError(t, stream.CloseSend()) + resp, err := stream.Recv() + + if err != nil { + errChan <- err + } else { + respChan <- resp + } + }() + } + + err := <-errChan + testhelper.RequireGrpcError( + t, + helper.ErrInternalf("rate limiting stream request: %w", + limithandler.ErrMaxQueueSize), + err) + + // allow the first request to finish + close(s.blockCh) + + // This allows the second request to finish + <-s.reqArrivedCh + + // we expect two responses. The first request, and the second + // request. The third request returned immediately with an error + // from the limit handler. + <-respChan + <-respChan +} + type queueTestServer struct { server reqArrivedCh chan struct{} @@ -235,6 +317,25 @@ func (q *queueTestServer) Unary(ctx context.Context, in *pb.UnaryRequest) (*pb.U return &pb.UnaryResponse{Ok: true}, nil } +func (q *queueTestServer) Bidirectional(stream pb.Test_BidirectionalServer) error { + // Read all the input + for { + if _, err := stream.Recv(); err != nil { + if err != io.EOF { + return err + } + + break + } + q.reqArrivedCh <- struct{}{} + + q.registerRequest() + } + <-q.blockCh // Block to ensure concurrency + + return stream.Send(&pb.BidirectionalResponse{Ok: true}) +} + func TestConcurrencyLimitHandlerMetrics(t *testing.T) { s := &queueTestServer{reqArrivedCh: make(chan struct{})} s.blockCh = make(chan struct{}) |