diff options
author | John Cai <jcai@gitlab.com> | 2022-02-08 18:15:16 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-02-11 05:26:21 +0300 |
commit | 605adfbce4dbac7ff0da69838b650fdb1f1fa244 (patch) | |
tree | 640d0702f1c126209d3150138402a2d3640acac4 | |
parent | 32b9777c9f3f217324d95d6e25b6ed1ddee13f68 (diff) |
limithandler: Add metrics for queue limiting
Add counter metrics for when requests are dropped due to timing out in
the concurrency queue, or when the queue size is maxed out.
Changelog: added
5 files changed, 159 insertions, 16 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 92e5f965d..9dbf87b75 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -15,6 +15,9 @@ import ( // concurrency queue. var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached") +// ErrMaxQueueSize indicates the concurrency queue has reached its maximum size +var ErrMaxQueueSize = errors.New("maximum queue size reached") + // LimitedFunc represents a function that will be limited type LimitedFunc func() (resp interface{}, err error) @@ -117,7 +120,8 @@ func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error { if featureflag.ConcurrencyQueueEnforceMax.IsEnabled(ctx) && c.queuedLimit > 0 && c.queued >= c.queuedLimit { - return errors.New("maximum queue size reached") + c.monitor.Dropped(ctx, "max_size") + return ErrMaxQueueSize } c.queued++ @@ -159,6 +163,9 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite c.monitor.Dequeued(ctx) if err != nil { + if errors.Is(err, ErrMaxQueueTime) { + c.monitor.Dropped(ctx, "max_time") + } return nil, err } defer sem.release() diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index e581b17e2..bbeda4d76 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -16,12 +16,14 @@ import ( type counter struct { sync.Mutex - max int - current int - queued int - dequeued int - enter int - exit int + max int + current int + queued int + dequeued int + enter int + exit int + droppedSize int + droppedTime int } func (c *counter) up() { @@ -71,6 +73,15 @@ func (c *counter) Exit(ctx context.Context) { c.exit++ } +func (c *counter) Dropped(ctx context.Context, reason string) { + switch reason { + case "max_time": + c.droppedTime++ + case "max_size": + c.droppedSize++ + } +} + func TestLimiter(t *testing.T) { t.Parallel() @@ -253,9 +264,9 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) { ) monitorCh := make(chan struct{}) - gauge := &blockingQueueCounter{queuedCh: monitorCh} + monitor := &blockingQueueCounter{queuedCh: monitorCh} ch := make(chan struct{}) - limiter := NewLimiter(1, queueLimit, nil, gauge) + limiter := NewLimiter(1, queueLimit, nil, monitor) // occupied with one live request that takes a long time to complete go func() { @@ -303,13 +314,14 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) { err := <-errChan assert.Error(t, err) assert.Equal(t, "maximum queue size reached", err.Error()) + assert.Equal(t, monitor.droppedSize, 1) } else { <-monitorCh assert.Equal(t, int64(queueLimit+1), limiter.queued) + assert.Equal(t, monitor.droppedSize, 0) } close(ch) - wg.Wait() }) } @@ -341,14 +353,15 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) { ticker := helper.NewManualTicker() dequeuedCh := make(chan struct{}) - gauge := &blockingDequeueCounter{dequeuedCh: dequeuedCh} + monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh} + limiter := NewLimiter( 1, 0, func() helper.Ticker { return ticker }, - gauge, + monitor, ) ch := make(chan struct{}) @@ -379,7 +392,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) { err := <-errChan assert.Equal(t, ErrMaxQueueTime, err) - + assert.Equal(t, monitor.droppedTime, 1) close(ch) wg.Wait() }) @@ -394,14 +407,15 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) { ticker := helper.NewManualTicker() dequeuedCh := make(chan struct{}) - gauge := &blockingDequeueCounter{dequeuedCh: dequeuedCh} + monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh} + limiter := NewLimiter( 1, 0, func() helper.Ticker { return ticker }, - gauge, + monitor, ) ch := make(chan struct{}) @@ -430,5 +444,6 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) { err := <-errChan assert.NoError(t, err) + assert.Equal(t, monitor.droppedTime, 0) }) } diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go index 347ceccf8..ac33ff4b1 100644 --- a/internal/middleware/limithandler/middleware.go +++ b/internal/middleware/limithandler/middleware.go @@ -38,6 +38,7 @@ type LimiterMiddleware struct { acquiringSecondsMetric *prometheus.HistogramVec inProgressMetric *prometheus.GaugeVec queuedMetric *prometheus.GaugeVec + requestsDroppedMetric *prometheus.CounterVec } // New creates a new rate limiter @@ -73,6 +74,18 @@ func New(cfg config.Cfg, getLockKey GetLockKey) *LimiterMiddleware { }, []string{"system", "grpc_service", "grpc_method"}, ), + requestsDroppedMetric: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_requests_dropped_total", + Help: "Number of requests dropped from the queue", + }, + []string{ + "system", + "grpc_service", + "grpc_method", + "reason", + }, + ), } middleware.methodLimiters = createLimiterConfig(middleware, cfg) return middleware @@ -88,6 +101,7 @@ func (c *LimiterMiddleware) Collect(metrics chan<- prometheus.Metric) { c.acquiringSecondsMetric.Collect(metrics) c.inProgressMetric.Collect(metrics) c.queuedMetric.Collect(metrics) + c.requestsDroppedMetric.Collect(metrics) } // UnaryInterceptor returns a Unary Interceptor diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go index cfd7380e0..0c63127f9 100644 --- a/internal/middleware/limithandler/middleware_test.go +++ b/internal/middleware/limithandler/middleware_test.go @@ -1,15 +1,18 @@ package limithandler_test import ( + "bytes" "context" "net" "sync" "testing" "time" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -216,7 +219,99 @@ func TestStreamLimitHandler(t *testing.T) { } } -func runServer(t *testing.T, s *server, opt ...grpc.ServerOption) (*grpc.Server, string) { +type queueTestServer struct { + server + reqArrivedCh chan struct{} +} + +func (q *queueTestServer) Unary(ctx context.Context, in *pb.UnaryRequest) (*pb.UnaryResponse, error) { + q.registerRequest() + + q.reqArrivedCh <- struct{}{} // We need a way to know when a request got to the middleware + <-q.blockCh // Block to ensure concurrency + + return &pb.UnaryResponse{Ok: true}, nil +} + +func TestLimitHandlerMetrics(t *testing.T) { + s := &queueTestServer{reqArrivedCh: make(chan struct{})} + s.blockCh = make(chan struct{}) + + methodName := "/test.limithandler.Test/Unary" + cfg := config.Cfg{ + Concurrency: []config.Concurrency{ + {RPC: methodName, MaxPerRepo: 1, MaxQueueSize: 1}, + }, + } + + lh := limithandler.New(cfg, fixedLockKey) + interceptor := lh.UnaryInterceptor() + srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) + defer srv.Stop() + + client, conn := newClient(t, serverSocketPath) + defer conn.Close() + + ctx := featureflag.IncomingCtxWithFeatureFlag( + testhelper.Context(t), + featureflag.ConcurrencyQueueEnforceMax, + true, + ) + + go func() { + _, err := client.Unary(ctx, &pb.UnaryRequest{}) + require.NoError(t, err) + }() + // wait until the first request is being processed. After this, requests will be queued + <-s.reqArrivedCh + + respCh := make(chan *pb.UnaryResponse) + errChan := make(chan error) + // out of ten requests, the first one will be queued and the other 9 will return with + // an error + for i := 0; i < 10; i++ { + go func() { + resp, err := client.Unary(ctx, &pb.UnaryRequest{}) + if err != nil { + errChan <- err + } else { + respCh <- resp + } + }() + } + + var errs int + for err := range errChan { + testhelper.RequireGrpcError(t, limithandler.ErrMaxQueueSize, err) + errs++ + if errs == 9 { + break + } + } + + expectedMetrics := `# HELP gitaly_rate_limiting_in_progress Gauge of number of concurrent in-progress calls +# TYPE gitaly_rate_limiting_in_progress gauge +gitaly_rate_limiting_in_progress{grpc_method="ReplicateRepository",grpc_service="gitaly.RepositoryService",system="gitaly"} 0 +gitaly_rate_limiting_in_progress{grpc_method="Unary",grpc_service="test.limithandler.Test",system="gitaly"} 1 +# HELP gitaly_rate_limiting_queued Gauge of number of queued calls +# TYPE gitaly_rate_limiting_queued gauge +gitaly_rate_limiting_queued{grpc_method="ReplicateRepository",grpc_service="gitaly.RepositoryService",system="gitaly"} 0 +gitaly_rate_limiting_queued{grpc_method="Unary",grpc_service="test.limithandler.Test",system="gitaly"} 1 +# HELP gitaly_requests_dropped_total Number of requests dropped from the queue +# TYPE gitaly_requests_dropped_total counter +gitaly_requests_dropped_total{grpc_method="Unary",grpc_service="test.limithandler.Test",reason="max_size",system="gitaly"} 9 +` + assert.NoError(t, promtest.CollectAndCompare(lh, bytes.NewBufferString(expectedMetrics), + "gitaly_rate_limiting_queued", + "gitaly_requests_dropped_total", + "gitaly_rate_limiting_in_progress")) + + close(s.blockCh) + <-s.reqArrivedCh + <-respCh +} + +func runServer(t *testing.T, s pb.TestServer, opt ...grpc.ServerOption) (*grpc.Server, string) { serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t) grpcServer := grpc.NewServer(opt...) pb.RegisterTestServer(grpcServer, s) diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go index 40c3869dc..f77014b9d 100644 --- a/internal/middleware/limithandler/monitor.go +++ b/internal/middleware/limithandler/monitor.go @@ -17,6 +17,7 @@ type ConcurrencyMonitor interface { Dequeued(ctx context.Context) Enter(ctx context.Context, acquireTime time.Duration) Exit(ctx context.Context) + Dropped(ctx context.Context, message string) } type nullConcurrencyMonitor struct{} @@ -25,11 +26,13 @@ func (c *nullConcurrencyMonitor) Queued(ctx context.Context) func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {} func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {} func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {} +func (c *nullConcurrencyMonitor) Dropped(ctx context.Context, reason string) {} type promMonitor struct { queuedMetric prometheus.Gauge inProgressMetric prometheus.Gauge acquiringSecondsMetric prometheus.Observer + requestsDroppedMetric *prometheus.CounterVec } // newPromMonitor creates a new ConcurrencyMonitor that tracks limiter @@ -41,6 +44,11 @@ func newPromMonitor(lh *LimiterMiddleware, system string, fullMethod string) Con lh.queuedMetric.WithLabelValues(system, serviceName, methodName), lh.inProgressMetric.WithLabelValues(system, serviceName, methodName), lh.acquiringSecondsMetric.WithLabelValues(system, serviceName, methodName), + lh.requestsDroppedMetric.MustCurryWith(prometheus.Labels{ + "system": system, + "grpc_service": serviceName, + "grpc_method": methodName, + }), } } @@ -67,6 +75,10 @@ func (c *promMonitor) Exit(ctx context.Context) { c.inProgressMetric.Dec() } +func (c *promMonitor) Dropped(ctx context.Context, reason string) { + c.requestsDroppedMetric.WithLabelValues(reason).Inc() +} + func splitMethodName(fullMethodName string) (string, string) { fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash if i := strings.Index(fullMethodName, "/"); i >= 0 { |