Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-02-08 18:15:16 +0300
committerJohn Cai <jcai@gitlab.com>2022-02-11 05:26:21 +0300
commit605adfbce4dbac7ff0da69838b650fdb1f1fa244 (patch)
tree640d0702f1c126209d3150138402a2d3640acac4
parent32b9777c9f3f217324d95d6e25b6ed1ddee13f68 (diff)
limithandler: Add metrics for queue limiting
Add counter metrics for when requests are dropped due to timing out in the concurrency queue, or when the queue size is maxed out. Changelog: added
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go9
-rw-r--r--internal/middleware/limithandler/concurrency_limiter_test.go43
-rw-r--r--internal/middleware/limithandler/middleware.go14
-rw-r--r--internal/middleware/limithandler/middleware_test.go97
-rw-r--r--internal/middleware/limithandler/monitor.go12
5 files changed, 159 insertions, 16 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index 92e5f965d..9dbf87b75 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -15,6 +15,9 @@ import (
// concurrency queue.
var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")
+// ErrMaxQueueSize indicates the concurrency queue has reached its maximum size
+var ErrMaxQueueSize = errors.New("maximum queue size reached")
+
// LimitedFunc represents a function that will be limited
type LimitedFunc func() (resp interface{}, err error)
@@ -117,7 +120,8 @@ func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error {
if featureflag.ConcurrencyQueueEnforceMax.IsEnabled(ctx) &&
c.queuedLimit > 0 &&
c.queued >= c.queuedLimit {
- return errors.New("maximum queue size reached")
+ c.monitor.Dropped(ctx, "max_size")
+ return ErrMaxQueueSize
}
c.queued++
@@ -159,6 +163,9 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
c.monitor.Dequeued(ctx)
if err != nil {
+ if errors.Is(err, ErrMaxQueueTime) {
+ c.monitor.Dropped(ctx, "max_time")
+ }
return nil, err
}
defer sem.release()
diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go
index e581b17e2..bbeda4d76 100644
--- a/internal/middleware/limithandler/concurrency_limiter_test.go
+++ b/internal/middleware/limithandler/concurrency_limiter_test.go
@@ -16,12 +16,14 @@ import (
type counter struct {
sync.Mutex
- max int
- current int
- queued int
- dequeued int
- enter int
- exit int
+ max int
+ current int
+ queued int
+ dequeued int
+ enter int
+ exit int
+ droppedSize int
+ droppedTime int
}
func (c *counter) up() {
@@ -71,6 +73,15 @@ func (c *counter) Exit(ctx context.Context) {
c.exit++
}
+func (c *counter) Dropped(ctx context.Context, reason string) {
+ switch reason {
+ case "max_time":
+ c.droppedTime++
+ case "max_size":
+ c.droppedSize++
+ }
+}
+
func TestLimiter(t *testing.T) {
t.Parallel()
@@ -253,9 +264,9 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
)
monitorCh := make(chan struct{})
- gauge := &blockingQueueCounter{queuedCh: monitorCh}
+ monitor := &blockingQueueCounter{queuedCh: monitorCh}
ch := make(chan struct{})
- limiter := NewLimiter(1, queueLimit, nil, gauge)
+ limiter := NewLimiter(1, queueLimit, nil, monitor)
// occupied with one live request that takes a long time to complete
go func() {
@@ -303,13 +314,14 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
err := <-errChan
assert.Error(t, err)
assert.Equal(t, "maximum queue size reached", err.Error())
+ assert.Equal(t, monitor.droppedSize, 1)
} else {
<-monitorCh
assert.Equal(t, int64(queueLimit+1), limiter.queued)
+ assert.Equal(t, monitor.droppedSize, 0)
}
close(ch)
-
wg.Wait()
})
}
@@ -341,14 +353,15 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
ticker := helper.NewManualTicker()
dequeuedCh := make(chan struct{})
- gauge := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
+ monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
+
limiter := NewLimiter(
1,
0,
func() helper.Ticker {
return ticker
},
- gauge,
+ monitor,
)
ch := make(chan struct{})
@@ -379,7 +392,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
err := <-errChan
assert.Equal(t, ErrMaxQueueTime, err)
-
+ assert.Equal(t, monitor.droppedTime, 1)
close(ch)
wg.Wait()
})
@@ -394,14 +407,15 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
ticker := helper.NewManualTicker()
dequeuedCh := make(chan struct{})
- gauge := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
+ monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
+
limiter := NewLimiter(
1,
0,
func() helper.Ticker {
return ticker
},
- gauge,
+ monitor,
)
ch := make(chan struct{})
@@ -430,5 +444,6 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
err := <-errChan
assert.NoError(t, err)
+ assert.Equal(t, monitor.droppedTime, 0)
})
}
diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go
index 347ceccf8..ac33ff4b1 100644
--- a/internal/middleware/limithandler/middleware.go
+++ b/internal/middleware/limithandler/middleware.go
@@ -38,6 +38,7 @@ type LimiterMiddleware struct {
acquiringSecondsMetric *prometheus.HistogramVec
inProgressMetric *prometheus.GaugeVec
queuedMetric *prometheus.GaugeVec
+ requestsDroppedMetric *prometheus.CounterVec
}
// New creates a new rate limiter
@@ -73,6 +74,18 @@ func New(cfg config.Cfg, getLockKey GetLockKey) *LimiterMiddleware {
},
[]string{"system", "grpc_service", "grpc_method"},
),
+ requestsDroppedMetric: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_requests_dropped_total",
+ Help: "Number of requests dropped from the queue",
+ },
+ []string{
+ "system",
+ "grpc_service",
+ "grpc_method",
+ "reason",
+ },
+ ),
}
middleware.methodLimiters = createLimiterConfig(middleware, cfg)
return middleware
@@ -88,6 +101,7 @@ func (c *LimiterMiddleware) Collect(metrics chan<- prometheus.Metric) {
c.acquiringSecondsMetric.Collect(metrics)
c.inProgressMetric.Collect(metrics)
c.queuedMetric.Collect(metrics)
+ c.requestsDroppedMetric.Collect(metrics)
}
// UnaryInterceptor returns a Unary Interceptor
diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go
index cfd7380e0..0c63127f9 100644
--- a/internal/middleware/limithandler/middleware_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -1,15 +1,18 @@
package limithandler_test
import (
+ "bytes"
"context"
"net"
"sync"
"testing"
"time"
+ promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
@@ -216,7 +219,99 @@ func TestStreamLimitHandler(t *testing.T) {
}
}
-func runServer(t *testing.T, s *server, opt ...grpc.ServerOption) (*grpc.Server, string) {
+type queueTestServer struct {
+ server
+ reqArrivedCh chan struct{}
+}
+
+func (q *queueTestServer) Unary(ctx context.Context, in *pb.UnaryRequest) (*pb.UnaryResponse, error) {
+ q.registerRequest()
+
+ q.reqArrivedCh <- struct{}{} // We need a way to know when a request got to the middleware
+ <-q.blockCh // Block to ensure concurrency
+
+ return &pb.UnaryResponse{Ok: true}, nil
+}
+
+func TestLimitHandlerMetrics(t *testing.T) {
+ s := &queueTestServer{reqArrivedCh: make(chan struct{})}
+ s.blockCh = make(chan struct{})
+
+ methodName := "/test.limithandler.Test/Unary"
+ cfg := config.Cfg{
+ Concurrency: []config.Concurrency{
+ {RPC: methodName, MaxPerRepo: 1, MaxQueueSize: 1},
+ },
+ }
+
+ lh := limithandler.New(cfg, fixedLockKey)
+ interceptor := lh.UnaryInterceptor()
+ srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
+ defer srv.Stop()
+
+ client, conn := newClient(t, serverSocketPath)
+ defer conn.Close()
+
+ ctx := featureflag.IncomingCtxWithFeatureFlag(
+ testhelper.Context(t),
+ featureflag.ConcurrencyQueueEnforceMax,
+ true,
+ )
+
+ go func() {
+ _, err := client.Unary(ctx, &pb.UnaryRequest{})
+ require.NoError(t, err)
+ }()
+ // wait until the first request is being processed. After this, requests will be queued
+ <-s.reqArrivedCh
+
+ respCh := make(chan *pb.UnaryResponse)
+ errChan := make(chan error)
+ // out of ten requests, the first one will be queued and the other 9 will return with
+ // an error
+ for i := 0; i < 10; i++ {
+ go func() {
+ resp, err := client.Unary(ctx, &pb.UnaryRequest{})
+ if err != nil {
+ errChan <- err
+ } else {
+ respCh <- resp
+ }
+ }()
+ }
+
+ var errs int
+ for err := range errChan {
+ testhelper.RequireGrpcError(t, limithandler.ErrMaxQueueSize, err)
+ errs++
+ if errs == 9 {
+ break
+ }
+ }
+
+ expectedMetrics := `# HELP gitaly_rate_limiting_in_progress Gauge of number of concurrent in-progress calls
+# TYPE gitaly_rate_limiting_in_progress gauge
+gitaly_rate_limiting_in_progress{grpc_method="ReplicateRepository",grpc_service="gitaly.RepositoryService",system="gitaly"} 0
+gitaly_rate_limiting_in_progress{grpc_method="Unary",grpc_service="test.limithandler.Test",system="gitaly"} 1
+# HELP gitaly_rate_limiting_queued Gauge of number of queued calls
+# TYPE gitaly_rate_limiting_queued gauge
+gitaly_rate_limiting_queued{grpc_method="ReplicateRepository",grpc_service="gitaly.RepositoryService",system="gitaly"} 0
+gitaly_rate_limiting_queued{grpc_method="Unary",grpc_service="test.limithandler.Test",system="gitaly"} 1
+# HELP gitaly_requests_dropped_total Number of requests dropped from the queue
+# TYPE gitaly_requests_dropped_total counter
+gitaly_requests_dropped_total{grpc_method="Unary",grpc_service="test.limithandler.Test",reason="max_size",system="gitaly"} 9
+`
+ assert.NoError(t, promtest.CollectAndCompare(lh, bytes.NewBufferString(expectedMetrics),
+ "gitaly_rate_limiting_queued",
+ "gitaly_requests_dropped_total",
+ "gitaly_rate_limiting_in_progress"))
+
+ close(s.blockCh)
+ <-s.reqArrivedCh
+ <-respCh
+}
+
+func runServer(t *testing.T, s pb.TestServer, opt ...grpc.ServerOption) (*grpc.Server, string) {
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
grpcServer := grpc.NewServer(opt...)
pb.RegisterTestServer(grpcServer, s)
diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go
index 40c3869dc..f77014b9d 100644
--- a/internal/middleware/limithandler/monitor.go
+++ b/internal/middleware/limithandler/monitor.go
@@ -17,6 +17,7 @@ type ConcurrencyMonitor interface {
Dequeued(ctx context.Context)
Enter(ctx context.Context, acquireTime time.Duration)
Exit(ctx context.Context)
+ Dropped(ctx context.Context, message string)
}
type nullConcurrencyMonitor struct{}
@@ -25,11 +26,13 @@ func (c *nullConcurrencyMonitor) Queued(ctx context.Context)
func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {}
func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {}
func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {}
+func (c *nullConcurrencyMonitor) Dropped(ctx context.Context, reason string) {}
type promMonitor struct {
queuedMetric prometheus.Gauge
inProgressMetric prometheus.Gauge
acquiringSecondsMetric prometheus.Observer
+ requestsDroppedMetric *prometheus.CounterVec
}
// newPromMonitor creates a new ConcurrencyMonitor that tracks limiter
@@ -41,6 +44,11 @@ func newPromMonitor(lh *LimiterMiddleware, system string, fullMethod string) Con
lh.queuedMetric.WithLabelValues(system, serviceName, methodName),
lh.inProgressMetric.WithLabelValues(system, serviceName, methodName),
lh.acquiringSecondsMetric.WithLabelValues(system, serviceName, methodName),
+ lh.requestsDroppedMetric.MustCurryWith(prometheus.Labels{
+ "system": system,
+ "grpc_service": serviceName,
+ "grpc_method": methodName,
+ }),
}
}
@@ -67,6 +75,10 @@ func (c *promMonitor) Exit(ctx context.Context) {
c.inProgressMetric.Dec()
}
+func (c *promMonitor) Dropped(ctx context.Context, reason string) {
+ c.requestsDroppedMetric.WithLabelValues(reason).Inc()
+}
+
func splitMethodName(fullMethodName string) (string, string) {
fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
if i := strings.Index(fullMethodName, "/"); i >= 0 {