Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-04-23 00:43:45 +0300
committerJohn Cai <jcai@gitlab.com>2022-04-28 04:58:01 +0300
commit2dcb7931814eca9f2ecab20bcf235d51ccf3a978 (patch)
tree09fd4fe3a4447d607a64cba832a917dc521e1b90
parent10d5f9bbf09a1405185519e34756925e8cbcc19f (diff)
limithandler: Return error from limit handlerjc-fix-limiter-streaming
In c83f00059 (add concurrency queue limit 2022-01-24), we modified the limit handler to return immediately with an error if certain limit conditions are reached. In the wrappedStream however, RecvMsg() merely logs the error. This will block indefinitely, because when the limiter returns with an error, it will no longer try to get a concurrency slot. Thus this function will wait until the context is cancelled. Fix this by returning an error when we get an error from the limiter. Changelog: fixed
-rw-r--r--internal/middleware/limithandler/middleware.go5
-rw-r--r--internal/middleware/limithandler/middleware_test.go101
2 files changed, 106 insertions, 0 deletions
diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go
index 0d4ff6bbc..3f8bfeeeb 100644
--- a/internal/middleware/limithandler/middleware.go
+++ b/internal/middleware/limithandler/middleware.go
@@ -7,6 +7,7 @@ import (
grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"google.golang.org/grpc"
)
@@ -146,6 +147,7 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
}
ready := make(chan struct{})
+ errs := make(chan error)
go func() {
if _, err := limiter.Limit(ctx, lockKey, func() (interface{}, error) {
close(ready)
@@ -153,6 +155,7 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
return nil, nil
}); err != nil {
ctxlogrus.Extract(ctx).WithError(err).Error("rate limiting streaming request")
+ errs <- err
}
}()
@@ -162,5 +165,7 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
case <-ready:
// It's our turn!
return nil
+ case err := <-errs:
+ return helper.ErrInternalf("rate limiting stream request: %v", err)
}
}
diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go
index 53f0f53f2..bb953f616 100644
--- a/internal/middleware/limithandler/middleware_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -3,6 +3,7 @@ package limithandler_test
import (
"bytes"
"context"
+ "io"
"net"
"sync"
"testing"
@@ -12,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata"
@@ -221,6 +223,86 @@ func TestStreamLimitHandler(t *testing.T) {
}
}
+func TestStreamLimitHandler_error(t *testing.T) {
+ t.Parallel()
+
+ s := &queueTestServer{reqArrivedCh: make(chan struct{})}
+ s.blockCh = make(chan struct{})
+
+ cfg := config.Cfg{
+ Concurrency: []config.Concurrency{
+ {RPC: "/test.limithandler.Test/Bidirectional", MaxPerRepo: 1, MaxQueueSize: 1},
+ },
+ }
+
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ interceptor := lh.StreamInterceptor()
+ srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
+ defer srv.Stop()
+
+ client, conn := newClient(t, serverSocketPath)
+ defer conn.Close()
+
+ ctx := featureflag.IncomingCtxWithFeatureFlag(
+ testhelper.Context(t),
+ featureflag.ConcurrencyQueueEnforceMax,
+ true,
+ )
+
+ respChan := make(chan *pb.BidirectionalResponse)
+ go func() {
+ stream, err := client.Bidirectional(ctx)
+ require.NoError(t, err)
+ require.NoError(t, stream.Send(&pb.BidirectionalRequest{}))
+ require.NoError(t, stream.CloseSend())
+ resp, err := stream.Recv()
+ require.NoError(t, err)
+ respChan <- resp
+ }()
+ // The first request will be blocked by blockCh.
+ <-s.reqArrivedCh
+
+ // These are the second and third requests to be sent.
+ // The second request will be waiting in the queue.
+ // The third request should return with an error.
+ errChan := make(chan error)
+ for i := 0; i < 2; i++ {
+ go func() {
+ stream, err := client.Bidirectional(ctx)
+ require.NoError(t, err)
+ require.NotNil(t, stream)
+ require.NoError(t, stream.Send(&pb.BidirectionalRequest{}))
+ require.NoError(t, stream.CloseSend())
+ resp, err := stream.Recv()
+
+ if err != nil {
+ errChan <- err
+ } else {
+ respChan <- resp
+ }
+ }()
+ }
+
+ err := <-errChan
+ testhelper.RequireGrpcError(
+ t,
+ helper.ErrInternalf("rate limiting stream request: %w",
+ limithandler.ErrMaxQueueSize),
+ err)
+
+ // allow the first request to finish
+ close(s.blockCh)
+
+ // This allows the second request to finish
+ <-s.reqArrivedCh
+
+ // we expect two responses. The first request, and the second
+ // request. The third request returned immediately with an error
+ // from the limit handler.
+ <-respChan
+ <-respChan
+}
+
type queueTestServer struct {
server
reqArrivedCh chan struct{}
@@ -235,6 +317,25 @@ func (q *queueTestServer) Unary(ctx context.Context, in *pb.UnaryRequest) (*pb.U
return &pb.UnaryResponse{Ok: true}, nil
}
+func (q *queueTestServer) Bidirectional(stream pb.Test_BidirectionalServer) error {
+ // Read all the input
+ for {
+ if _, err := stream.Recv(); err != nil {
+ if err != io.EOF {
+ return err
+ }
+
+ break
+ }
+ q.reqArrivedCh <- struct{}{}
+
+ q.registerRequest()
+ }
+ <-q.blockCh // Block to ensure concurrency
+
+ return stream.Send(&pb.BidirectionalResponse{Ok: true})
+}
+
func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
s := &queueTestServer{reqArrivedCh: make(chan struct{})}
s.blockCh = make(chan struct{})