Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2023-05-16 21:59:27 +0300
committerJustin Tobler <jtobler@gitlab.com>2023-05-16 21:59:27 +0300
commit05463be9a1df998a5a02f8b4063bad83040bc649 (patch)
tree599d26dcd0ed79796fdb4e304ff4d8fcf57959bf
parent161d11edce6a478d5186ec2c92d95d1de0f93a01 (diff)
parent477b7df395ee431031c7189b1aa76a1441dffdc2 (diff)
Merge branch 'qmnguyen0711/always-output-concurrency-queue-ms' into 'master'
Always log limit.concurrency_queue_ms field See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5773 Merged-by: Justin Tobler <jtobler@gitlab.com> Approved-by: Justin Tobler <jtobler@gitlab.com> Approved-by: karthik nayak <knayak@gitlab.com> Reviewed-by: karthik nayak <knayak@gitlab.com> Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r--internal/grpc/middleware/limithandler/concurrency_limiter.go12
-rw-r--r--internal/grpc/middleware/limithandler/concurrency_limiter_test.go2
-rw-r--r--internal/grpc/middleware/limithandler/monitor.go15
-rw-r--r--internal/grpc/middleware/limithandler/monitor_test.go447
4 files changed, 368 insertions, 108 deletions
diff --git a/internal/grpc/middleware/limithandler/concurrency_limiter.go b/internal/grpc/middleware/limithandler/concurrency_limiter.go
index 1f807d9db..52b4c293b 100644
--- a/internal/grpc/middleware/limithandler/concurrency_limiter.go
+++ b/internal/grpc/middleware/limithandler/concurrency_limiter.go
@@ -76,7 +76,6 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
}
}()
default:
- sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "max_size")
return ErrMaxQueueSize
}
}
@@ -104,10 +103,8 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
case sem.concurrencyTokens <- struct{}{}:
return nil
case <-ticker.C():
- sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "max_time")
return ErrMaxQueueTime
case <-ctx.Done():
- sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "other")
return ctx.Err()
}
}
@@ -120,6 +117,11 @@ func (sem *keyedConcurrencyLimiter) release() {
<-sem.concurrencyTokens
}
+// queueLength returns the length of token queue
+func (sem *keyedConcurrencyLimiter) queueLength() int {
+ return len(sem.queueTokens)
+}
+
// ConcurrencyLimiter contains rate limiter state.
type ConcurrencyLimiter struct {
// maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
@@ -186,18 +188,22 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
start := time.Now()
if err := sem.acquire(ctx, limitingKey); err != nil {
+ queueTime := time.Since(start)
switch err {
case ErrMaxQueueSize:
+ c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "max_size")
return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{
ErrorMessage: err.Error(),
RetryAfter: durationpb.New(0),
})
case ErrMaxQueueTime:
+ c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "max_time")
return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{
ErrorMessage: err.Error(),
RetryAfter: durationpb.New(0),
})
default:
+ c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "other")
return nil, fmt.Errorf("unexpected error when dequeueing request: %w", err)
}
}
diff --git a/internal/grpc/middleware/limithandler/concurrency_limiter_test.go b/internal/grpc/middleware/limithandler/concurrency_limiter_test.go
index 0a0fa0734..02adb7d04 100644
--- a/internal/grpc/middleware/limithandler/concurrency_limiter_test.go
+++ b/internal/grpc/middleware/limithandler/concurrency_limiter_test.go
@@ -77,7 +77,7 @@ func (c *counter) Exit(context.Context) {
c.exit++
}
-func (c *counter) Dropped(_ context.Context, _ string, _ int, reason string) {
+func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, reason string) {
switch reason {
case "max_time":
c.droppedTime++
diff --git a/internal/grpc/middleware/limithandler/monitor.go b/internal/grpc/middleware/limithandler/monitor.go
index da295c563..b251df6d4 100644
--- a/internal/grpc/middleware/limithandler/monitor.go
+++ b/internal/grpc/middleware/limithandler/monitor.go
@@ -14,16 +14,16 @@ type ConcurrencyMonitor interface {
Dequeued(ctx context.Context)
Enter(ctx context.Context, acquireTime time.Duration)
Exit(ctx context.Context)
- Dropped(ctx context.Context, key string, length int, message string)
+ Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, message string)
}
type noopConcurrencyMonitor struct{}
-func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {}
-func (c *noopConcurrencyMonitor) Dequeued(context.Context) {}
-func (c *noopConcurrencyMonitor) Enter(context.Context, time.Duration) {}
-func (c *noopConcurrencyMonitor) Exit(context.Context) {}
-func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, string) {}
+func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {}
+func (c *noopConcurrencyMonitor) Dequeued(context.Context) {}
+func (c *noopConcurrencyMonitor) Enter(context.Context, time.Duration) {}
+func (c *noopConcurrencyMonitor) Exit(context.Context) {}
+func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, time.Duration, string) {}
// NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor
func NewNoopConcurrencyMonitor() ConcurrencyMonitor {
@@ -99,11 +99,12 @@ func (p *PromMonitor) Exit(ctx context.Context) {
}
// Dropped is called when a request is dropped.
-func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, reason string) {
+func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, reason string) {
if stats := limitStatsFromContext(ctx); stats != nil {
stats.SetLimitingKey(p.limitingType, key)
stats.SetConcurrencyQueueLength(length)
stats.SetConcurrencyDroppedReason(reason)
+ stats.AddConcurrencyQueueMs(acquireTime.Milliseconds())
}
p.requestsDroppedMetric.WithLabelValues(reason).Inc()
}
diff --git a/internal/grpc/middleware/limithandler/monitor_test.go b/internal/grpc/middleware/limithandler/monitor_test.go
index 1c5eb9dbf..3b5d6163f 100644
--- a/internal/grpc/middleware/limithandler/monitor_test.go
+++ b/internal/grpc/middleware/limithandler/monitor_test.go
@@ -16,57 +16,59 @@ import (
func TestNewPerRPCPromMonitor(t *testing.T) {
system := "gitaly"
fullMethod := "fullMethod"
- acquiringSecondsVec := prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Name: "acquiring_seconds",
- Help: "seconds to acquire",
- Buckets: promconfig.DefaultConfig().GRPCLatencyBuckets,
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- inProgressMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "in_progress",
- Help: "requests in progress",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- queuedMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "queued",
- Help: "number of queued requests",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- requestsDroppedMetric := prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "dropped",
- Help: "number of dropped requests",
- },
- []string{
- "system",
- "grpc_service",
- "grpc_method",
- "reason",
- },
- )
-
- rpcMonitor := newPerRPCPromMonitor(
- system,
- fullMethod,
- queuedMetric,
- inProgressMetric,
- acquiringSecondsVec,
- requestsDroppedMetric,
- )
-
- ctx := InitLimitStats(testhelper.Context(t))
-
- rpcMonitor.Queued(ctx, fullMethod, 5)
- rpcMonitor.Enter(ctx, time.Second)
- rpcMonitor.Dropped(ctx, fullMethod, 5, "load")
-
- expectedMetrics := `# HELP acquiring_seconds seconds to acquire
+ createNewMonitor := func() *PromMonitor {
+ acquiringSecondsVec := prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Name: "acquiring_seconds",
+ Help: "seconds to acquire",
+ Buckets: promconfig.DefaultConfig().GRPCLatencyBuckets,
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ inProgressMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "in_progress",
+ Help: "requests in progress",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ queuedMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "queued",
+ Help: "number of queued requests",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ requestsDroppedMetric := prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "dropped",
+ Help: "number of dropped requests",
+ },
+ []string{
+ "system",
+ "grpc_service",
+ "grpc_method",
+ "reason",
+ },
+ )
+ return newPerRPCPromMonitor(
+ system,
+ fullMethod,
+ queuedMetric,
+ inProgressMetric,
+ acquiringSecondsVec,
+ requestsDroppedMetric,
+ )
+ }
+
+ t.Run("request is dequeued successfully", func(t *testing.T) {
+ rpcMonitor := createNewMonitor()
+ ctx := InitLimitStats(testhelper.Context(t))
+
+ rpcMonitor.Queued(ctx, fullMethod, 5)
+ rpcMonitor.Enter(ctx, time.Second)
+
+ expectedMetrics := `# HELP acquiring_seconds seconds to acquire
# TYPE acquiring_seconds histogram
acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.001"} 0
acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.005"} 0
@@ -82,49 +84,170 @@ acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gi
acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="+Inf"} 1
acquiring_seconds_sum{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
acquiring_seconds_count{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
+# HELP in_progress requests in progress
+# TYPE in_progress gauge
+in_progress{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
+# HELP queued number of queued requests
+# TYPE queued gauge
+queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
+
+`
+ require.NoError(t, testutil.CollectAndCompare(
+ rpcMonitor,
+ bytes.NewBufferString(expectedMetrics),
+ "in_progress",
+ "queued",
+ "dropped",
+ "acquiring_seconds",
+ ))
+
+ stats := limitStatsFromContext(ctx)
+ require.NotNil(t, stats)
+ require.Equal(t, logrus.Fields{
+ "limit.limiting_type": TypePerRPC,
+ "limit.limiting_key": fullMethod,
+ "limit.concurrency_queue_ms": int64(1000),
+ "limit.concurrency_queue_length": 5,
+ }, stats.Fields())
+
+ // After the request exists, in_progress and queued gauge decrease
+ rpcMonitor.Exit(ctx)
+ rpcMonitor.Dequeued(ctx)
+ expectedMetrics = `# HELP acquiring_seconds seconds to acquire
+# HELP in_progress requests in progress
+# TYPE in_progress gauge
+in_progress{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
+# HELP queued number of queued requests
+# TYPE queued gauge
+queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
+
+`
+ require.NoError(t, testutil.CollectAndCompare(
+ rpcMonitor,
+ bytes.NewBufferString(expectedMetrics),
+ "in_progress",
+ "queued",
+ ))
+ })
+
+ t.Run("request is dropped after queueing", func(t *testing.T) {
+ rpcMonitor := createNewMonitor()
+ ctx := InitLimitStats(testhelper.Context(t))
+
+ rpcMonitor.Queued(ctx, fullMethod, 5)
+ rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load")
+
+ expectedMetrics := `# HELP acquiring_seconds seconds to acquire
+# TYPE acquiring_seconds histogram
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.001"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.005"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.025"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.1"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.5"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="1"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="10"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="30"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="60"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="300"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="1500"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="+Inf"} 0
+acquiring_seconds_sum{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
+acquiring_seconds_count{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
# HELP dropped number of dropped requests
# TYPE dropped counter
dropped{grpc_method="unknown",grpc_service="unknown",reason="load",system="gitaly"} 1
# HELP in_progress requests in progress
# TYPE in_progress gauge
-in_progress{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
+in_progress{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
# HELP queued number of queued requests
# TYPE queued gauge
queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
`
- require.NoError(t, testutil.CollectAndCompare(
- rpcMonitor,
- bytes.NewBufferString(expectedMetrics),
- "in_progress",
- "queued",
- "dropped",
- "acquiring_seconds",
- ))
-
- stats := limitStatsFromContext(ctx)
- require.NotNil(t, stats)
- require.Equal(t, logrus.Fields{
- "limit.limiting_type": TypePerRPC,
- "limit.limiting_key": fullMethod,
- "limit.concurrency_queue_ms": int64(1000),
- "limit.concurrency_queue_length": 5,
- "limit.concurrency_dropped": "load",
- }, stats.Fields())
+ require.NoError(t, testutil.CollectAndCompare(
+ rpcMonitor,
+ bytes.NewBufferString(expectedMetrics),
+ "in_progress",
+ "queued",
+ "dropped",
+ "acquiring_seconds",
+ ))
+
+ stats := limitStatsFromContext(ctx)
+ require.NotNil(t, stats)
+ require.Equal(t, logrus.Fields{
+ "limit.limiting_type": TypePerRPC,
+ "limit.limiting_key": fullMethod,
+ "limit.concurrency_queue_ms": int64(1000),
+ "limit.concurrency_queue_length": 5,
+ "limit.concurrency_dropped": "load",
+ }, stats.Fields())
+ })
+
+ t.Run("request is dropped before queueing", func(t *testing.T) {
+ rpcMonitor := createNewMonitor()
+ ctx := InitLimitStats(testhelper.Context(t))
+ rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load")
+
+ expectedMetrics := `# HELP acquiring_seconds seconds to acquire
+# TYPE acquiring_seconds histogram
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.001"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.005"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.025"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.1"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="0.5"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="1"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="10"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="30"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="60"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="300"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="1500"} 0
+acquiring_seconds_bucket{grpc_method="unknown",grpc_service="unknown",system="gitaly",le="+Inf"} 0
+acquiring_seconds_sum{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
+acquiring_seconds_count{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
+# HELP dropped number of dropped requests
+# TYPE dropped counter
+dropped{grpc_method="unknown",grpc_service="unknown",reason="load",system="gitaly"} 1
+# HELP in_progress requests in progress
+# TYPE in_progress gauge
+in_progress{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
+# HELP queued number of queued requests
+# TYPE queued gauge
+queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
+
+`
+ require.NoError(t, testutil.CollectAndCompare(
+ rpcMonitor,
+ bytes.NewBufferString(expectedMetrics),
+ "in_progress",
+ "queued",
+ "dropped",
+ "acquiring_seconds",
+ ))
+
+ stats := limitStatsFromContext(ctx)
+ require.NotNil(t, stats)
+ require.Equal(t, logrus.Fields{
+ "limit.limiting_type": TypePerRPC,
+ "limit.limiting_key": fullMethod,
+ "limit.concurrency_queue_ms": int64(1000),
+ "limit.concurrency_queue_length": 5,
+ "limit.concurrency_dropped": "load",
+ }, stats.Fields())
+ })
}
func TestNewPackObjectsConcurrencyMonitor(t *testing.T) {
- ctx := InitLimitStats(testhelper.Context(t))
-
- m := NewPackObjectsConcurrencyMonitor(
- promconfig.DefaultConfig().GRPCLatencyBuckets,
- )
+ t.Run("request is dequeued successfully", func(t *testing.T) {
+ ctx := InitLimitStats(testhelper.Context(t))
+ packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor(
+ promconfig.DefaultConfig().GRPCLatencyBuckets,
+ )
- m.Queued(ctx, "1234", 5)
- m.Enter(ctx, time.Second)
- m.Dropped(ctx, "1234", 5, "load")
+ packObjectsConcurrencyMonitor.Queued(ctx, "1234", 5)
+ packObjectsConcurrencyMonitor.Enter(ctx, time.Second)
- expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds)
+ expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds)
# TYPE gitaly_pack_objects_acquiring_seconds histogram
gitaly_pack_objects_acquiring_seconds_bucket{le="0.001"} 0
gitaly_pack_objects_acquiring_seconds_bucket{le="0.005"} 0
@@ -140,30 +263,160 @@ gitaly_pack_objects_acquiring_seconds_bucket{le="1500"} 1
gitaly_pack_objects_acquiring_seconds_bucket{le="+Inf"} 1
gitaly_pack_objects_acquiring_seconds_sum 1
gitaly_pack_objects_acquiring_seconds_count 1
+# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls
+# TYPE gitaly_pack_objects_in_progress gauge
+gitaly_pack_objects_in_progress 1
+# HELP gitaly_pack_objects_queued Gauge of number of queued calls
+# TYPE gitaly_pack_objects_queued gauge
+gitaly_pack_objects_queued 1
+
+`
+ require.NoError(t, testutil.CollectAndCompare(
+ packObjectsConcurrencyMonitor,
+ bytes.NewBufferString(expectedMetrics),
+ "gitaly_pack_objects_acquiring_seconds",
+ "gitaly_pack_objects_in_progress",
+ "gitaly_pack_objects_queued",
+ "gitaly_pack_objects_dropped_total",
+ ))
+
+ stats := limitStatsFromContext(ctx)
+ require.NotNil(t, stats)
+ require.Equal(t, logrus.Fields{
+ "limit.limiting_type": TypePackObjects,
+ "limit.limiting_key": "1234",
+ "limit.concurrency_queue_ms": int64(1000),
+ "limit.concurrency_queue_length": 5,
+ }, stats.Fields())
+
+ // After the request exists, in_progress and queued gauge decrease
+ packObjectsConcurrencyMonitor.Exit(ctx)
+ packObjectsConcurrencyMonitor.Dequeued(ctx)
+ expectedMetrics = `# HELP acquiring_seconds seconds to acquire
+# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls
+# TYPE gitaly_pack_objects_in_progress gauge
+gitaly_pack_objects_in_progress 0
+# HELP gitaly_pack_objects_queued Gauge of number of queued calls
+# TYPE gitaly_pack_objects_queued gauge
+gitaly_pack_objects_queued 0
+
+`
+ require.NoError(t, testutil.CollectAndCompare(
+ packObjectsConcurrencyMonitor,
+ bytes.NewBufferString(expectedMetrics),
+ "gitaly_pack_objects_in_progress",
+ "gitaly_pack_objects_queued",
+ ))
+ })
+
+ t.Run("request is dropped after queueing", func(t *testing.T) {
+ ctx := InitLimitStats(testhelper.Context(t))
+ packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor(
+ promconfig.DefaultConfig().GRPCLatencyBuckets,
+ )
+
+ packObjectsConcurrencyMonitor.Queued(ctx, "1234", 5)
+ packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, time.Second, "load")
+
+ expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds)
+# TYPE gitaly_pack_objects_acquiring_seconds histogram
+gitaly_pack_objects_acquiring_seconds_bucket{le="0.001"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="0.005"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="0.025"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="0.1"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="0.5"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="1"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="10"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="30"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="60"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="300"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="1500"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="+Inf"} 0
+gitaly_pack_objects_acquiring_seconds_sum 0
+gitaly_pack_objects_acquiring_seconds_count 0
# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue
# TYPE gitaly_pack_objects_dropped_total counter
gitaly_pack_objects_dropped_total{reason="load"} 1
+# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls
+# TYPE gitaly_pack_objects_in_progress gauge
+gitaly_pack_objects_in_progress 0
# HELP gitaly_pack_objects_queued Gauge of number of queued calls
# TYPE gitaly_pack_objects_queued gauge
gitaly_pack_objects_queued 1
`
- require.NoError(t, testutil.CollectAndCompare(
- m,
- bytes.NewBufferString(expectedMetrics),
- "gitaly_pack_objects_acquiring_seconds",
- "gitaly_pack_objecfts_in_progress",
- "gitaly_pack_objects_queued",
- "gitaly_pack_objects_dropped_total",
- ))
-
- stats := limitStatsFromContext(ctx)
- require.NotNil(t, stats)
- require.Equal(t, logrus.Fields{
- "limit.limiting_type": TypePackObjects,
- "limit.limiting_key": "1234",
- "limit.concurrency_queue_ms": int64(1000),
- "limit.concurrency_queue_length": 5,
- "limit.concurrency_dropped": "load",
- }, stats.Fields())
+ require.NoError(t, testutil.CollectAndCompare(
+ packObjectsConcurrencyMonitor,
+ bytes.NewBufferString(expectedMetrics),
+ "gitaly_pack_objects_acquiring_seconds",
+ "gitaly_pack_objects_in_progress",
+ "gitaly_pack_objects_queued",
+ "gitaly_pack_objects_dropped_total",
+ ))
+
+ stats := limitStatsFromContext(ctx)
+ require.NotNil(t, stats)
+ require.Equal(t, logrus.Fields{
+ "limit.limiting_type": TypePackObjects,
+ "limit.limiting_key": "1234",
+ "limit.concurrency_queue_ms": int64(1000),
+ "limit.concurrency_queue_length": 5,
+ "limit.concurrency_dropped": "load",
+ }, stats.Fields())
+ })
+
+ t.Run("request is dropped before queueing", func(t *testing.T) {
+ ctx := InitLimitStats(testhelper.Context(t))
+ packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor(
+ promconfig.DefaultConfig().GRPCLatencyBuckets,
+ )
+
+ packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, time.Second, "load")
+
+ expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds)
+# TYPE gitaly_pack_objects_acquiring_seconds histogram
+gitaly_pack_objects_acquiring_seconds_bucket{le="0.001"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="0.005"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="0.025"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="0.1"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="0.5"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="1"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="10"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="30"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="60"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="300"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="1500"} 0
+gitaly_pack_objects_acquiring_seconds_bucket{le="+Inf"} 0
+gitaly_pack_objects_acquiring_seconds_sum 0
+gitaly_pack_objects_acquiring_seconds_count 0
+# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue
+# TYPE gitaly_pack_objects_dropped_total counter
+gitaly_pack_objects_dropped_total{reason="load"} 1
+# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls
+# TYPE gitaly_pack_objects_in_progress gauge
+gitaly_pack_objects_in_progress 0
+# HELP gitaly_pack_objects_queued Gauge of number of queued calls
+# TYPE gitaly_pack_objects_queued gauge
+gitaly_pack_objects_queued 0
+
+`
+ require.NoError(t, testutil.CollectAndCompare(
+ packObjectsConcurrencyMonitor,
+ bytes.NewBufferString(expectedMetrics),
+ "gitaly_pack_objects_acquiring_seconds",
+ "gitaly_pack_objects_in_progress",
+ "gitaly_pack_objects_queued",
+ "gitaly_pack_objects_dropped_total",
+ ))
+
+ stats := limitStatsFromContext(ctx)
+ require.NotNil(t, stats)
+ require.Equal(t, logrus.Fields{
+ "limit.limiting_type": TypePackObjects,
+ "limit.limiting_key": "1234",
+ "limit.concurrency_queue_ms": int64(1000),
+ "limit.concurrency_queue_length": 5,
+ "limit.concurrency_dropped": "load",
+ }, stats.Fields())
+ })
}