Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-04-29 17:51:19 +0300
committerJohn Cai <jcai@gitlab.com>2022-04-29 17:51:19 +0300
commitc47bb5beba81703b7c2bb2b3d1c138cb667fa80c (patch)
tree5935a10dc5e554c0fc5462e14842e189ac63a619
parentb3a0b630bd33b6f23902203d5443c7518b316b41 (diff)
parentde291225acc1244454075466cb0986e5bf7072fa (diff)
Merge branch 'jc-fix-limiter-streaming' into 'master'
limithandler: Return error from limit handler See merge request gitlab-org/gitaly!4492
-rw-r--r--internal/middleware/limithandler/middleware.go7
-rw-r--r--internal/middleware/limithandler/middleware_test.go101
2 files changed, 106 insertions, 2 deletions
diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go
index 0d4ff6bbc..7ee2aa37f 100644
--- a/internal/middleware/limithandler/middleware.go
+++ b/internal/middleware/limithandler/middleware.go
@@ -3,10 +3,10 @@ package limithandler
import (
"context"
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"google.golang.org/grpc"
)
@@ -146,13 +146,14 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
}
ready := make(chan struct{})
+ errs := make(chan error)
go func() {
if _, err := limiter.Limit(ctx, lockKey, func() (interface{}, error) {
close(ready)
<-ctx.Done()
return nil, nil
}); err != nil {
- ctxlogrus.Extract(ctx).WithError(err).Error("rate limiting streaming request")
+ errs <- err
}
}()
@@ -162,5 +163,7 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
case <-ready:
// It's our turn!
return nil
+ case err := <-errs:
+ return helper.ErrInternalf("rate limiting stream request: %v", err)
}
}
diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go
index 53f0f53f2..bb953f616 100644
--- a/internal/middleware/limithandler/middleware_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -3,6 +3,7 @@ package limithandler_test
import (
"bytes"
"context"
+ "io"
"net"
"sync"
"testing"
@@ -12,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata"
@@ -221,6 +223,86 @@ func TestStreamLimitHandler(t *testing.T) {
}
}
+func TestStreamLimitHandler_error(t *testing.T) {
+ t.Parallel()
+
+ s := &queueTestServer{reqArrivedCh: make(chan struct{})}
+ s.blockCh = make(chan struct{})
+
+ cfg := config.Cfg{
+ Concurrency: []config.Concurrency{
+ {RPC: "/test.limithandler.Test/Bidirectional", MaxPerRepo: 1, MaxQueueSize: 1},
+ },
+ }
+
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ interceptor := lh.StreamInterceptor()
+ srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
+ defer srv.Stop()
+
+ client, conn := newClient(t, serverSocketPath)
+ defer conn.Close()
+
+ ctx := featureflag.IncomingCtxWithFeatureFlag(
+ testhelper.Context(t),
+ featureflag.ConcurrencyQueueEnforceMax,
+ true,
+ )
+
+ respChan := make(chan *pb.BidirectionalResponse)
+ go func() {
+ stream, err := client.Bidirectional(ctx)
+ require.NoError(t, err)
+ require.NoError(t, stream.Send(&pb.BidirectionalRequest{}))
+ require.NoError(t, stream.CloseSend())
+ resp, err := stream.Recv()
+ require.NoError(t, err)
+ respChan <- resp
+ }()
+ // The first request will be blocked by blockCh.
+ <-s.reqArrivedCh
+
+ // These are the second and third requests to be sent.
+ // The second request will be waiting in the queue.
+ // The third request should return with an error.
+ errChan := make(chan error)
+ for i := 0; i < 2; i++ {
+ go func() {
+ stream, err := client.Bidirectional(ctx)
+ require.NoError(t, err)
+ require.NotNil(t, stream)
+ require.NoError(t, stream.Send(&pb.BidirectionalRequest{}))
+ require.NoError(t, stream.CloseSend())
+ resp, err := stream.Recv()
+
+ if err != nil {
+ errChan <- err
+ } else {
+ respChan <- resp
+ }
+ }()
+ }
+
+ err := <-errChan
+ testhelper.RequireGrpcError(
+ t,
+ helper.ErrInternalf("rate limiting stream request: %w",
+ limithandler.ErrMaxQueueSize),
+ err)
+
+ // allow the first request to finish
+ close(s.blockCh)
+
+ // This allows the second request to finish
+ <-s.reqArrivedCh
+
+ // we expect two responses. The first request, and the second
+ // request. The third request returned immediately with an error
+ // from the limit handler.
+ <-respChan
+ <-respChan
+}
+
type queueTestServer struct {
server
reqArrivedCh chan struct{}
@@ -235,6 +317,25 @@ func (q *queueTestServer) Unary(ctx context.Context, in *pb.UnaryRequest) (*pb.U
return &pb.UnaryResponse{Ok: true}, nil
}
+func (q *queueTestServer) Bidirectional(stream pb.Test_BidirectionalServer) error {
+ // Read all the input
+ for {
+ if _, err := stream.Recv(); err != nil {
+ if err != io.EOF {
+ return err
+ }
+
+ break
+ }
+ q.reqArrivedCh <- struct{}{}
+
+ q.registerRequest()
+ }
+ <-q.blockCh // Block to ensure concurrency
+
+ return stream.Send(&pb.BidirectionalResponse{Ok: true})
+}
+
func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
s := &queueTestServer{reqArrivedCh: make(chan struct{})}
s.blockCh = make(chan struct{})