diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-05-04 11:03:48 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-05-05 06:54:12 +0300 |
commit | c748c3336be0fb73f95d2fad22b4cda865b62766 (patch) | |
tree | efc81c0e5d9d1c7a3f3285cf1dff01e7074ddf49 | |
parent | 57d660d46e312304bb49a1e02aab907e1ff1cf20 (diff) |
Add limiter type to concurrency limiter
There are two versions of concurrency limiting: a generic per-rpc option
and a pack-objects option. These exist on different layers, with the
former being at the middleware layer and the latter located before
calling the git-pack-objects command. It can be difficult to distinguish
between the two from the logs. To address this, a limiter type has been
added to the concurrency monitor through this commit.
7 files changed, 48 insertions, 18 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 9bae47daa..086a2e66d 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -16,6 +16,15 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ) +const ( + // TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All + // requests of the same method shares the concurrency limit. + TypePerRPC = "per-rpc" + // TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request + // information (RemoteIP/Repository/User) as the limiting key. + TypePackObjects = "pack-objects" +) + // ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the // concurrency queue. var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached") @@ -312,8 +321,10 @@ func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { limit.MaxPerRepo, limit.MaxQueueSize, newTickerFunc, - newPerRPCPromMonitor("gitaly", limit.RPC, queuedMetric, inProgressMetric, - acquiringSecondsMetric, middleware.requestsDroppedMetric), + newPerRPCPromMonitor( + "gitaly", limit.RPC, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), ) } @@ -326,8 +337,11 @@ func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { func() helper.Ticker { return helper.NewManualTicker() }, - newPerRPCPromMonitor("gitaly", replicateRepositoryFullMethod, queuedMetric, - inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric)) + newPerRPCPromMonitor( + "gitaly", replicateRepositoryFullMethod, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) } middleware.methodLimiters = result diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index bf80d2b81..a4317d5a2 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -237,7 +237,7 @@ type blockingQueueCounter struct { // Queued will block on a channel. We need a way to synchronize on when a Limiter has attempted to acquire // a semaphore but has not yet. The caller can use the channel to wait for many requests to be queued -func (b *blockingQueueCounter) Queued(_ context.Context, _ string, _ int) { +func (b *blockingQueueCounter) Queued(context.Context, string, int) { b.queuedCh <- struct{}{} } diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go index 0e0a68e59..aa44d3e20 100644 --- a/internal/middleware/limithandler/monitor.go +++ b/internal/middleware/limithandler/monitor.go @@ -8,7 +8,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// ConcurrencyMonitor allows the concurrency monitor to be observed +// ConcurrencyMonitor allows the concurrency monitor to be observed. type ConcurrencyMonitor interface { Queued(ctx context.Context, key string, length int) Dequeued(ctx context.Context) @@ -32,8 +32,11 @@ func NewNoopConcurrencyMonitor() ConcurrencyMonitor { // PromMonitor keeps track of prometheus metrics for limithandlers. // It conforms to both the ConcurrencyMonitor, and prometheus.Collector -// interfaces +// interfaces. type PromMonitor struct { + // limitingType stores the type of the limiter. There are two types at the moment: per-rpc + // and pack-objects. + limitingType string queuedMetric prometheus.Gauge inProgressMetric prometheus.Gauge acquiringSecondsMetric prometheus.Observer @@ -53,6 +56,7 @@ func newPerRPCPromMonitor( serviceName, methodName := splitMethodName(fullMethod) return &PromMonitor{ + limitingType: TypePerRPC, queuedMetric: queuedMetric.WithLabelValues(system, serviceName, methodName), inProgressMetric: inProgressMetric.WithLabelValues(system, serviceName, methodName), acquiringSecondsMetric: acquiringSecondsVec.WithLabelValues(system, serviceName, methodName), @@ -65,21 +69,21 @@ func newPerRPCPromMonitor( } } -// Queued is called when a request has been queued +// Queued is called when a request has been queued. func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int) { if stats := limitStatsFromContext(ctx); stats != nil { - stats.SetLimitingKey(key) + stats.SetLimitingKey(p.limitingType, key) stats.SetConcurrencyQueueLength(queueLength) } p.queuedMetric.Inc() } -// Dequeued is called when a request has been dequeued +// Dequeued is called when a request has been dequeued. func (p *PromMonitor) Dequeued(ctx context.Context) { p.queuedMetric.Dec() } -// Enter is called when a request begins to be processed +// Enter is called when a request begins to be processed. func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) { p.inProgressMetric.Inc() p.acquiringSecondsMetric.Observe(acquireTime.Seconds()) @@ -89,7 +93,7 @@ func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) { } } -// Exit is called when a request has finished processing +// Exit is called when a request has finished processing. func (p *PromMonitor) Exit(ctx context.Context) { p.inProgressMetric.Dec() } @@ -97,7 +101,7 @@ func (p *PromMonitor) Exit(ctx context.Context) { // Dropped is called when a request is dropped. func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, reason string) { if stats := limitStatsFromContext(ctx); stats != nil { - stats.SetLimitingKey(key) + stats.SetLimitingKey(p.limitingType, key) stats.SetConcurrencyQueueLength(length) stats.SetConcurrencyDroppedReason(reason) } @@ -105,12 +109,14 @@ func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, reaso } func newPromMonitor( + limitingType string, keyType string, queuedVec, inProgressVec *prometheus.GaugeVec, acquiringSecondsVec *prometheus.HistogramVec, requestsDroppedVec *prometheus.CounterVec, ) *PromMonitor { return &PromMonitor{ + limitingType: limitingType, queuedMetric: queuedVec.WithLabelValues(keyType), inProgressMetric: inProgressVec.WithLabelValues(keyType), acquiringSecondsMetric: acquiringSecondsVec.WithLabelValues(keyType), @@ -119,7 +125,7 @@ func newPromMonitor( } } -// Collect collects all the metrics that PromMonitor keeps track of +// Collect collects all the metrics that PromMonitor keeps track of. func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric) { p.queuedMetric.Collect(metrics) p.inProgressMetric.Collect(metrics) @@ -127,7 +133,7 @@ func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric) { p.requestsDroppedMetric.Collect(metrics) } -// Describe describes all the metrics that PromMonitor keeps track of +// Describe describes all the metrics that PromMonitor keeps track of. func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc) { prometheus.DescribeByCollect(p, descs) } @@ -142,7 +148,7 @@ func splitMethodName(fullMethodName string) (string, string) { } // NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use -// with limiting pack objects processes +// with limiting pack objects processes. func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64) *PromMonitor { acquiringSecondsVec := prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -178,6 +184,7 @@ func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64) ).MustCurryWith(prometheus.Labels{"type": keyType}) return newPromMonitor( + TypePackObjects, keyType, queuedVec, inProgressVec, diff --git a/internal/middleware/limithandler/monitor_test.go b/internal/middleware/limithandler/monitor_test.go index 2d9bc6c16..751332683 100644 --- a/internal/middleware/limithandler/monitor_test.go +++ b/internal/middleware/limithandler/monitor_test.go @@ -105,6 +105,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 stats := limitStatsFromContext(ctx) require.NotNil(t, stats) require.Equal(t, logrus.Fields{ + "limit.limiting_type": TypePerRPC, "limit.limiting_key": fullMethod, "limit.concurrency_queue_ms": int64(1000), "limit.concurrency_queue_length": 5, @@ -160,6 +161,7 @@ gitaly_pack_objects_queued{type="user"} 1 stats := limitStatsFromContext(ctx) require.NotNil(t, stats) require.Equal(t, logrus.Fields{ + "limit.limiting_type": TypePackObjects, "limit.limiting_key": "1234", "limit.concurrency_queue_ms": int64(1000), "limit.concurrency_queue_length": 5, diff --git a/internal/middleware/limithandler/stats.go b/internal/middleware/limithandler/stats.go index 07985aeef..30926695a 100644 --- a/internal/middleware/limithandler/stats.go +++ b/internal/middleware/limithandler/stats.go @@ -16,6 +16,8 @@ type LimitStats struct { sync.Mutex // limitingKey is the key used for limiting accounting limitingKey string + // limitingType is the type of limiter + limitingType string // concurrencyQueueLen is the combination of in-flight requests and in-queue requests. It tells // how busy the queue of the same limiting key is concurrencyQueueLength int @@ -38,10 +40,11 @@ func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64) { } // SetLimitingKey set limiting key. -func (s *LimitStats) SetLimitingKey(limitingKey string) { +func (s *LimitStats) SetLimitingKey(limitingType string, limitingKey string) { s.Lock() defer s.Unlock() s.limitingKey = limitingKey + s.limitingType = limitingType } // SetConcurrencyQueueLength set concurrency queue length. @@ -67,6 +70,7 @@ func (s *LimitStats) Fields() logrus.Fields { return nil } logs := logrus.Fields{ + "limit.limiting_type": s.limitingType, "limit.limiting_key": s.limitingKey, "limit.concurrency_queue_ms": s.concurrencyQueueMs, "limit.concurrency_queue_length": s.concurrencyQueueLength, diff --git a/internal/middleware/limithandler/stats_interceptor_test.go b/internal/middleware/limithandler/stats_interceptor_test.go index 916bae1fc..7f6942fce 100644 --- a/internal/middleware/limithandler/stats_interceptor_test.go +++ b/internal/middleware/limithandler/stats_interceptor_test.go @@ -120,6 +120,7 @@ func TestInterceptor(t *testing.T) { require.NoError(t, err) }, expectedLogData: map[string]any{ + "limit.limiting_type": "per-rpc", "limit.limiting_key": "@hashed/1234", "limit.concurrency_queue_length": 0, }, @@ -141,6 +142,7 @@ func TestInterceptor(t *testing.T) { } }, expectedLogData: map[string]any{ + "limit.limiting_type": "per-rpc", "limit.limiting_key": "@hashed/1234", "limit.concurrency_queue_length": 0, }, diff --git a/internal/middleware/limithandler/stats_test.go b/internal/middleware/limithandler/stats_test.go index 66e07ce08..a51add517 100644 --- a/internal/middleware/limithandler/stats_test.go +++ b/internal/middleware/limithandler/stats_test.go @@ -15,12 +15,13 @@ func TestLimitStats(t *testing.T) { ctx = InitLimitStats(ctx) stats := limitStatsFromContext(ctx) - stats.SetLimitingKey("hello-world") + stats.SetLimitingKey("test-limiter", "hello-world") stats.AddConcurrencyQueueMs(13) stats.SetConcurrencyQueueLength(99) stats.SetConcurrencyDroppedReason("max_time") assert.Equal(t, FieldsProducer(ctx, nil), logrus.Fields{ + "limit.limiting_type": "test-limiter", "limit.limiting_key": "hello-world", "limit.concurrency_queue_ms": int64(13), "limit.concurrency_queue_length": 99, |