diff options
author | Justin Tobler <jtobler@gitlab.com> | 2023-05-16 21:59:27 +0300 |
---|---|---|
committer | Justin Tobler <jtobler@gitlab.com> | 2023-05-16 21:59:27 +0300 |
commit | 05463be9a1df998a5a02f8b4063bad83040bc649 (patch) | |
tree | 599d26dcd0ed79796fdb4e304ff4d8fcf57959bf | |
parent | 161d11edce6a478d5186ec2c92d95d1de0f93a01 (diff) | |
parent | 477b7df395ee431031c7189b1aa76a1441dffdc2 (diff) |
Merge branch 'qmnguyen0711/always-output-concurrency-queue-ms' into 'master'
Always log limit.concurrency_queue_ms field
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5773
Merged-by: Justin Tobler <jtobler@gitlab.com>
Approved-by: Justin Tobler <jtobler@gitlab.com>
Approved-by: karthik nayak <knayak@gitlab.com>
Reviewed-by: karthik nayak <knayak@gitlab.com>
Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
4 files changed, 368 insertions, 108 deletions
diff --git a/internal/grpc/middleware/limithandler/concurrency_limiter.go b/internal/grpc/middleware/limithandler/concurrency_limiter.go index 1f807d9db..52b4c293b 100644 --- a/internal/grpc/middleware/limithandler/concurrency_limiter.go +++ b/internal/grpc/middleware/limithandler/concurrency_limiter.go @@ -76,7 +76,6 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str } }() default: - sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "max_size") return ErrMaxQueueSize } } @@ -104,10 +103,8 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str case sem.concurrencyTokens <- struct{}{}: return nil case <-ticker.C(): - sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "max_time") return ErrMaxQueueTime case <-ctx.Done(): - sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "other") return ctx.Err() } } @@ -120,6 +117,11 @@ func (sem *keyedConcurrencyLimiter) release() { <-sem.concurrencyTokens } +// queueLength returns the length of token queue +func (sem *keyedConcurrencyLimiter) queueLength() int { + return len(sem.queueTokens) +} + // ConcurrencyLimiter contains rate limiter state. type ConcurrencyLimiter struct { // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function. @@ -186,18 +188,22 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li start := time.Now() if err := sem.acquire(ctx, limitingKey); err != nil { + queueTime := time.Since(start) switch err { case ErrMaxQueueSize: + c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "max_size") return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{ ErrorMessage: err.Error(), RetryAfter: durationpb.New(0), }) case ErrMaxQueueTime: + c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "max_time") return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{ ErrorMessage: err.Error(), RetryAfter: durationpb.New(0), }) default: + c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "other") return nil, fmt.Errorf("unexpected error when dequeueing request: %w", err) } } diff --git a/internal/grpc/middleware/limithandler/concurrency_limiter_test.go b/internal/grpc/middleware/limithandler/concurrency_limiter_test.go index 0a0fa0734..02adb7d04 100644 --- a/internal/grpc/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/grpc/middleware/limithandler/concurrency_limiter_test.go @@ -77,7 +77,7 @@ func (c *counter) Exit(context.Context) { c.exit++ } -func (c *counter) Dropped(_ context.Context, _ string, _ int, reason string) { +func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, reason string) { switch reason { case "max_time": c.droppedTime++ diff --git a/internal/grpc/middleware/limithandler/monitor.go b/internal/grpc/middleware/limithandler/monitor.go index da295c563..b251df6d4 100644 --- a/internal/grpc/middleware/limithandler/monitor.go +++ b/internal/grpc/middleware/limithandler/monitor.go @@ -14,16 +14,16 @@ type ConcurrencyMonitor interface { Dequeued(ctx context.Context) Enter(ctx context.Context, acquireTime time.Duration) Exit(ctx context.Context) - Dropped(ctx context.Context, key string, length int, message string) + Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, message string) } type noopConcurrencyMonitor struct{} -func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {} -func (c *noopConcurrencyMonitor) Dequeued(context.Context) {} -func (c *noopConcurrencyMonitor) Enter(context.Context, time.Duration) {} -func (c *noopConcurrencyMonitor) Exit(context.Context) {} -func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, string) {} +func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {} +func (c *noopConcurrencyMonitor) Dequeued(context.Context) {} +func (c *noopConcurrencyMonitor) Enter(context.Context, time.Duration) {} +func (c *noopConcurrencyMonitor) Exit(context.Context) {} +func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, time.Duration, string) {} // NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor func NewNoopConcurrencyMonitor() ConcurrencyMonitor { @@ -99,11 +99,12 @@ func (p *PromMonitor) Exit(ctx context.Context) { } // Dropped is called when a request is dropped. -func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, reason string) { +func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, reason string) { if stats := limitStatsFromContext(ctx); stats != nil { stats.SetLimitingKey(p.limitingType, key) stats.SetConcurrencyQueueLength(length) stats.SetConcurrencyDroppedReason(reason) + stats.AddConcurrencyQueueMs(acquireTime.Milliseconds()) } p.requestsDroppedMetric.WithLabelValues(reason).Inc() } diff --git a/internal/grpc/middleware/limithandler/monitor_test.go b/internal/grpc/middleware/limithandler/monitor_test.go index 1c5eb9dbf..3b5d6163f 100644 --- a/internal/grpc/middleware/limithandler/monitor_test.go +++ b/internal/grpc/middleware/limithandler/monitor_test.go @@ -16,57 +16,59 @@ import ( func TestNewPerRPCPromMonitor(t *testing.T) { system := "gitaly" fullMethod := "fullMethod" - acquiringSecondsVec := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "acquiring_seconds", - Help: "seconds to acquire", - Buckets: promconfig.DefaultConfig().GRPCLatencyBuckets, - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - inProgressMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "in_progress", - Help: "requests in progress", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - queuedMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "queued", - Help: "number of queued requests", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - requestsDroppedMetric := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "dropped", - Help: "number of dropped requests", - }, - []string{ - "system", - "grpc_service", - "grpc_method", - "reason", - }, - ) - - rpcMonitor := newPerRPCPromMonitor( - system, - fullMethod, - queuedMetric, - inProgressMetric, - acquiringSecondsVec, - requestsDroppedMetric, - ) - - ctx := InitLimitStats(testhelper.Context(t)) - - rpcMonitor.Queued(ctx, fullMethod, 5) - rpcMonitor.Enter(ctx, time.Second) - rpcMonitor.Dropped(ctx, fullMethod, 5, "load") - - expectedMetrics := `# HELP acquiring_seconds seconds to acquire + createNewMonitor := func() *PromMonitor { + acquiringSecondsVec := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "acquiring_seconds", + Help: "seconds to acquire", + Buckets: promconfig.DefaultConfig().GRPCLatencyBuckets, + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + inProgressMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "in_progress", + Help: "requests in progress", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + queuedMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "queued", + Help: "number of queued requests", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + requestsDroppedMetric := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "dropped", + Help: "number of dropped requests", + }, + []string{ + "system", + "grpc_service", + "grpc_method", + "reason", + }, + ) + return newPerRPCPromMonitor( + system, + fullMethod, + queuedMetric, + inProgressMetric, + acquiringSecondsVec, + requestsDroppedMetric, + ) + } + + t.Run("request is dequeued successfully", func(t *testing.T) { + rpcMonitor := createNewMonitor() + ctx := InitLimitStats(testhelper.Context(t)) + + rpcMonitor.Queued(ctx, fullMethod, 5) + rpcMonitor.Enter(ctx, time.Second) + + expectedMetrics := `# HELP acquiring_seconds seconds to acquire # TYPE acquiring_seconds histogram acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.001"} 0 acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.005"} 0 @@ -82,49 +84,170 @@ acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gi acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="+Inf"} 1 acquiring_seconds_sum{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 acquiring_seconds_count{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 +# HELP in_progress requests in progress +# TYPE in_progress gauge +in_progress{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 +# HELP queued number of queued requests +# TYPE queued gauge +queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 + +` + require.NoError(t, testutil.CollectAndCompare( + rpcMonitor, + bytes.NewBufferString(expectedMetrics), + "in_progress", + "queued", + "dropped", + "acquiring_seconds", + )) + + stats := limitStatsFromContext(ctx) + require.NotNil(t, stats) + require.Equal(t, logrus.Fields{ + "limit.limiting_type": TypePerRPC, + "limit.limiting_key": fullMethod, + "limit.concurrency_queue_ms": int64(1000), + "limit.concurrency_queue_length": 5, + }, stats.Fields()) + + // After the request exists, in_progress and queued gauge decrease + rpcMonitor.Exit(ctx) + rpcMonitor.Dequeued(ctx) + expectedMetrics = `# HELP acquiring_seconds seconds to acquire +# HELP in_progress requests in progress +# TYPE in_progress gauge +in_progress{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 +# HELP queued number of queued requests +# TYPE queued gauge +queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 + +` + require.NoError(t, testutil.CollectAndCompare( + rpcMonitor, + bytes.NewBufferString(expectedMetrics), + "in_progress", + "queued", + )) + }) + + t.Run("request is dropped after queueing", func(t *testing.T) { + rpcMonitor := createNewMonitor() + ctx := InitLimitStats(testhelper.Context(t)) + + rpcMonitor.Queued(ctx, fullMethod, 5) + rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load") + + expectedMetrics := `# HELP acquiring_seconds seconds to acquire +# TYPE acquiring_seconds histogram +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.001"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.005"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.025"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.1"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.5"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="1"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="10"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="30"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="60"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="300"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="1500"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="+Inf"} 0 +acquiring_seconds_sum{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 +acquiring_seconds_count{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 # HELP dropped number of dropped requests # TYPE dropped counter dropped{grpc_method="unknown",grpc_service="unknown",reason="load",system="gitaly"} 1 # HELP in_progress requests in progress # TYPE in_progress gauge -in_progress{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 +in_progress{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 # HELP queued number of queued requests # TYPE queued gauge queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 ` - require.NoError(t, testutil.CollectAndCompare( - rpcMonitor, - bytes.NewBufferString(expectedMetrics), - "in_progress", - "queued", - "dropped", - "acquiring_seconds", - )) - - stats := limitStatsFromContext(ctx) - require.NotNil(t, stats) - require.Equal(t, logrus.Fields{ - "limit.limiting_type": TypePerRPC, - "limit.limiting_key": fullMethod, - "limit.concurrency_queue_ms": int64(1000), - "limit.concurrency_queue_length": 5, - "limit.concurrency_dropped": "load", - }, stats.Fields()) + require.NoError(t, testutil.CollectAndCompare( + rpcMonitor, + bytes.NewBufferString(expectedMetrics), + "in_progress", + "queued", + "dropped", + "acquiring_seconds", + )) + + stats := limitStatsFromContext(ctx) + require.NotNil(t, stats) + require.Equal(t, logrus.Fields{ + "limit.limiting_type": TypePerRPC, + "limit.limiting_key": fullMethod, + "limit.concurrency_queue_ms": int64(1000), + "limit.concurrency_queue_length": 5, + "limit.concurrency_dropped": "load", + }, stats.Fields()) + }) + + t.Run("request is dropped before queueing", func(t *testing.T) { + rpcMonitor := createNewMonitor() + ctx := InitLimitStats(testhelper.Context(t)) + rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load") + + expectedMetrics := `# HELP acquiring_seconds seconds to acquire +# TYPE acquiring_seconds histogram +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.001"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.005"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.025"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.1"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.5"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="1"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="10"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="30"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="60"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="300"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="1500"} 0 +acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="+Inf"} 0 +acquiring_seconds_sum{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 +acquiring_seconds_count{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 +# HELP dropped number of dropped requests +# TYPE dropped counter +dropped{grpc_method="unknown",grpc_service="unknown",reason="load",system="gitaly"} 1 +# HELP in_progress requests in progress +# TYPE in_progress gauge +in_progress{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 +# HELP queued number of queued requests +# TYPE queued gauge +queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 + +` + require.NoError(t, testutil.CollectAndCompare( + rpcMonitor, + bytes.NewBufferString(expectedMetrics), + "in_progress", + "queued", + "dropped", + "acquiring_seconds", + )) + + stats := limitStatsFromContext(ctx) + require.NotNil(t, stats) + require.Equal(t, logrus.Fields{ + "limit.limiting_type": TypePerRPC, + "limit.limiting_key": fullMethod, + "limit.concurrency_queue_ms": int64(1000), + "limit.concurrency_queue_length": 5, + "limit.concurrency_dropped": "load", + }, stats.Fields()) + }) } func TestNewPackObjectsConcurrencyMonitor(t *testing.T) { - ctx := InitLimitStats(testhelper.Context(t)) - - m := NewPackObjectsConcurrencyMonitor( - promconfig.DefaultConfig().GRPCLatencyBuckets, - ) + t.Run("request is dequeued successfully", func(t *testing.T) { + ctx := InitLimitStats(testhelper.Context(t)) + packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor( + promconfig.DefaultConfig().GRPCLatencyBuckets, + ) - m.Queued(ctx, "1234", 5) - m.Enter(ctx, time.Second) - m.Dropped(ctx, "1234", 5, "load") + packObjectsConcurrencyMonitor.Queued(ctx, "1234", 5) + packObjectsConcurrencyMonitor.Enter(ctx, time.Second) - expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds) + expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds) # TYPE gitaly_pack_objects_acquiring_seconds histogram gitaly_pack_objects_acquiring_seconds_bucket{le="0.001"} 0 gitaly_pack_objects_acquiring_seconds_bucket{le="0.005"} 0 @@ -140,30 +263,160 @@ gitaly_pack_objects_acquiring_seconds_bucket{le="1500"} 1 gitaly_pack_objects_acquiring_seconds_bucket{le="+Inf"} 1 gitaly_pack_objects_acquiring_seconds_sum 1 gitaly_pack_objects_acquiring_seconds_count 1 +# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls +# TYPE gitaly_pack_objects_in_progress gauge +gitaly_pack_objects_in_progress 1 +# HELP gitaly_pack_objects_queued Gauge of number of queued calls +# TYPE gitaly_pack_objects_queued gauge +gitaly_pack_objects_queued 1 + +` + require.NoError(t, testutil.CollectAndCompare( + packObjectsConcurrencyMonitor, + bytes.NewBufferString(expectedMetrics), + "gitaly_pack_objects_acquiring_seconds", + "gitaly_pack_objects_in_progress", + "gitaly_pack_objects_queued", + "gitaly_pack_objects_dropped_total", + )) + + stats := limitStatsFromContext(ctx) + require.NotNil(t, stats) + require.Equal(t, logrus.Fields{ + "limit.limiting_type": TypePackObjects, + "limit.limiting_key": "1234", + "limit.concurrency_queue_ms": int64(1000), + "limit.concurrency_queue_length": 5, + }, stats.Fields()) + + // After the request exists, in_progress and queued gauge decrease + packObjectsConcurrencyMonitor.Exit(ctx) + packObjectsConcurrencyMonitor.Dequeued(ctx) + expectedMetrics = `# HELP acquiring_seconds seconds to acquire +# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls +# TYPE gitaly_pack_objects_in_progress gauge +gitaly_pack_objects_in_progress 0 +# HELP gitaly_pack_objects_queued Gauge of number of queued calls +# TYPE gitaly_pack_objects_queued gauge +gitaly_pack_objects_queued 0 + +` + require.NoError(t, testutil.CollectAndCompare( + packObjectsConcurrencyMonitor, + bytes.NewBufferString(expectedMetrics), + "gitaly_pack_objects_in_progress", + "gitaly_pack_objects_queued", + )) + }) + + t.Run("request is dropped after queueing", func(t *testing.T) { + ctx := InitLimitStats(testhelper.Context(t)) + packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor( + promconfig.DefaultConfig().GRPCLatencyBuckets, + ) + + packObjectsConcurrencyMonitor.Queued(ctx, "1234", 5) + packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, time.Second, "load") + + expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds) +# TYPE gitaly_pack_objects_acquiring_seconds histogram +gitaly_pack_objects_acquiring_seconds_bucket{le="0.001"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.005"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.025"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.1"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.5"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="1"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="10"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="30"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="60"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="300"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="1500"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="+Inf"} 0 +gitaly_pack_objects_acquiring_seconds_sum 0 +gitaly_pack_objects_acquiring_seconds_count 0 # HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue # TYPE gitaly_pack_objects_dropped_total counter gitaly_pack_objects_dropped_total{reason="load"} 1 +# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls +# TYPE gitaly_pack_objects_in_progress gauge +gitaly_pack_objects_in_progress 0 # HELP gitaly_pack_objects_queued Gauge of number of queued calls # TYPE gitaly_pack_objects_queued gauge gitaly_pack_objects_queued 1 ` - require.NoError(t, testutil.CollectAndCompare( - m, - bytes.NewBufferString(expectedMetrics), - "gitaly_pack_objects_acquiring_seconds", - "gitaly_pack_objecfts_in_progress", - "gitaly_pack_objects_queued", - "gitaly_pack_objects_dropped_total", - )) - - stats := limitStatsFromContext(ctx) - require.NotNil(t, stats) - require.Equal(t, logrus.Fields{ - "limit.limiting_type": TypePackObjects, - "limit.limiting_key": "1234", - "limit.concurrency_queue_ms": int64(1000), - "limit.concurrency_queue_length": 5, - "limit.concurrency_dropped": "load", - }, stats.Fields()) + require.NoError(t, testutil.CollectAndCompare( + packObjectsConcurrencyMonitor, + bytes.NewBufferString(expectedMetrics), + "gitaly_pack_objects_acquiring_seconds", + "gitaly_pack_objects_in_progress", + "gitaly_pack_objects_queued", + "gitaly_pack_objects_dropped_total", + )) + + stats := limitStatsFromContext(ctx) + require.NotNil(t, stats) + require.Equal(t, logrus.Fields{ + "limit.limiting_type": TypePackObjects, + "limit.limiting_key": "1234", + "limit.concurrency_queue_ms": int64(1000), + "limit.concurrency_queue_length": 5, + "limit.concurrency_dropped": "load", + }, stats.Fields()) + }) + + t.Run("request is dropped before queueing", func(t *testing.T) { + ctx := InitLimitStats(testhelper.Context(t)) + packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor( + promconfig.DefaultConfig().GRPCLatencyBuckets, + ) + + packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, time.Second, "load") + + expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds) +# TYPE gitaly_pack_objects_acquiring_seconds histogram +gitaly_pack_objects_acquiring_seconds_bucket{le="0.001"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.005"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.025"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.1"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.5"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="1"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="10"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="30"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="60"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="300"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="1500"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="+Inf"} 0 +gitaly_pack_objects_acquiring_seconds_sum 0 +gitaly_pack_objects_acquiring_seconds_count 0 +# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue +# TYPE gitaly_pack_objects_dropped_total counter +gitaly_pack_objects_dropped_total{reason="load"} 1 +# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls +# TYPE gitaly_pack_objects_in_progress gauge +gitaly_pack_objects_in_progress 0 +# HELP gitaly_pack_objects_queued Gauge of number of queued calls +# TYPE gitaly_pack_objects_queued gauge +gitaly_pack_objects_queued 0 + +` + require.NoError(t, testutil.CollectAndCompare( + packObjectsConcurrencyMonitor, + bytes.NewBufferString(expectedMetrics), + "gitaly_pack_objects_acquiring_seconds", + "gitaly_pack_objects_in_progress", + "gitaly_pack_objects_queued", + "gitaly_pack_objects_dropped_total", + )) + + stats := limitStatsFromContext(ctx) + require.NotNil(t, stats) + require.Equal(t, logrus.Fields{ + "limit.limiting_type": TypePackObjects, + "limit.limiting_key": "1234", + "limit.concurrency_queue_ms": int64(1000), + "limit.concurrency_queue_length": 5, + "limit.concurrency_dropped": "load", + }, stats.Fields()) + }) } |