Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-05-04 11:03:48 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-05-05 06:54:12 +0300
commitc748c3336be0fb73f95d2fad22b4cda865b62766 (patch)
treeefc81c0e5d9d1c7a3f3285cf1dff01e7074ddf49
parent57d660d46e312304bb49a1e02aab907e1ff1cf20 (diff)
Add limiter type to concurrency limiter
There are two versions of concurrency limiting: a generic per-rpc option and a pack-objects option. These exist on different layers, with the former being at the middleware layer and the latter located before calling the git-pack-objects command. It can be difficult to distinguish between the two from the logs. To address this, a limiter type has been added to the concurrency monitor through this commit.
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go22
-rw-r--r--internal/middleware/limithandler/concurrency_limiter_test.go2
-rw-r--r--internal/middleware/limithandler/monitor.go29
-rw-r--r--internal/middleware/limithandler/monitor_test.go2
-rw-r--r--internal/middleware/limithandler/stats.go6
-rw-r--r--internal/middleware/limithandler/stats_interceptor_test.go2
-rw-r--r--internal/middleware/limithandler/stats_test.go3
7 files changed, 48 insertions, 18 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index 9bae47daa..086a2e66d 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -16,6 +16,15 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)
+const (
+ // TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All
+ // requests of the same method shares the concurrency limit.
+ TypePerRPC = "per-rpc"
+ // TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request
+ // information (RemoteIP/Repository/User) as the limiting key.
+ TypePackObjects = "pack-objects"
+)
+
// ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the
// concurrency queue.
var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")
@@ -312,8 +321,10 @@ func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
limit.MaxPerRepo,
limit.MaxQueueSize,
newTickerFunc,
- newPerRPCPromMonitor("gitaly", limit.RPC, queuedMetric, inProgressMetric,
- acquiringSecondsMetric, middleware.requestsDroppedMetric),
+ newPerRPCPromMonitor(
+ "gitaly", limit.RPC,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
)
}
@@ -326,8 +337,11 @@ func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
func() helper.Ticker {
return helper.NewManualTicker()
},
- newPerRPCPromMonitor("gitaly", replicateRepositoryFullMethod, queuedMetric,
- inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric))
+ newPerRPCPromMonitor(
+ "gitaly", replicateRepositoryFullMethod,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
}
middleware.methodLimiters = result
diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go
index bf80d2b81..a4317d5a2 100644
--- a/internal/middleware/limithandler/concurrency_limiter_test.go
+++ b/internal/middleware/limithandler/concurrency_limiter_test.go
@@ -237,7 +237,7 @@ type blockingQueueCounter struct {
// Queued will block on a channel. We need a way to synchronize on when a Limiter has attempted to acquire
// a semaphore but has not yet. The caller can use the channel to wait for many requests to be queued
-func (b *blockingQueueCounter) Queued(_ context.Context, _ string, _ int) {
+func (b *blockingQueueCounter) Queued(context.Context, string, int) {
b.queuedCh <- struct{}{}
}
diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go
index 0e0a68e59..aa44d3e20 100644
--- a/internal/middleware/limithandler/monitor.go
+++ b/internal/middleware/limithandler/monitor.go
@@ -8,7 +8,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
-// ConcurrencyMonitor allows the concurrency monitor to be observed
+// ConcurrencyMonitor allows the concurrency monitor to be observed.
type ConcurrencyMonitor interface {
Queued(ctx context.Context, key string, length int)
Dequeued(ctx context.Context)
@@ -32,8 +32,11 @@ func NewNoopConcurrencyMonitor() ConcurrencyMonitor {
// PromMonitor keeps track of prometheus metrics for limithandlers.
// It conforms to both the ConcurrencyMonitor, and prometheus.Collector
-// interfaces
+// interfaces.
type PromMonitor struct {
+ // limitingType stores the type of the limiter. There are two types at the moment: per-rpc
+ // and pack-objects.
+ limitingType string
queuedMetric prometheus.Gauge
inProgressMetric prometheus.Gauge
acquiringSecondsMetric prometheus.Observer
@@ -53,6 +56,7 @@ func newPerRPCPromMonitor(
serviceName, methodName := splitMethodName(fullMethod)
return &PromMonitor{
+ limitingType: TypePerRPC,
queuedMetric: queuedMetric.WithLabelValues(system, serviceName, methodName),
inProgressMetric: inProgressMetric.WithLabelValues(system, serviceName, methodName),
acquiringSecondsMetric: acquiringSecondsVec.WithLabelValues(system, serviceName, methodName),
@@ -65,21 +69,21 @@ func newPerRPCPromMonitor(
}
}
-// Queued is called when a request has been queued
+// Queued is called when a request has been queued.
func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int) {
if stats := limitStatsFromContext(ctx); stats != nil {
- stats.SetLimitingKey(key)
+ stats.SetLimitingKey(p.limitingType, key)
stats.SetConcurrencyQueueLength(queueLength)
}
p.queuedMetric.Inc()
}
-// Dequeued is called when a request has been dequeued
+// Dequeued is called when a request has been dequeued.
func (p *PromMonitor) Dequeued(ctx context.Context) {
p.queuedMetric.Dec()
}
-// Enter is called when a request begins to be processed
+// Enter is called when a request begins to be processed.
func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
p.inProgressMetric.Inc()
p.acquiringSecondsMetric.Observe(acquireTime.Seconds())
@@ -89,7 +93,7 @@ func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
}
}
-// Exit is called when a request has finished processing
+// Exit is called when a request has finished processing.
func (p *PromMonitor) Exit(ctx context.Context) {
p.inProgressMetric.Dec()
}
@@ -97,7 +101,7 @@ func (p *PromMonitor) Exit(ctx context.Context) {
// Dropped is called when a request is dropped.
func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, reason string) {
if stats := limitStatsFromContext(ctx); stats != nil {
- stats.SetLimitingKey(key)
+ stats.SetLimitingKey(p.limitingType, key)
stats.SetConcurrencyQueueLength(length)
stats.SetConcurrencyDroppedReason(reason)
}
@@ -105,12 +109,14 @@ func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, reaso
}
func newPromMonitor(
+ limitingType string,
keyType string,
queuedVec, inProgressVec *prometheus.GaugeVec,
acquiringSecondsVec *prometheus.HistogramVec,
requestsDroppedVec *prometheus.CounterVec,
) *PromMonitor {
return &PromMonitor{
+ limitingType: limitingType,
queuedMetric: queuedVec.WithLabelValues(keyType),
inProgressMetric: inProgressVec.WithLabelValues(keyType),
acquiringSecondsMetric: acquiringSecondsVec.WithLabelValues(keyType),
@@ -119,7 +125,7 @@ func newPromMonitor(
}
}
-// Collect collects all the metrics that PromMonitor keeps track of
+// Collect collects all the metrics that PromMonitor keeps track of.
func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric) {
p.queuedMetric.Collect(metrics)
p.inProgressMetric.Collect(metrics)
@@ -127,7 +133,7 @@ func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric) {
p.requestsDroppedMetric.Collect(metrics)
}
-// Describe describes all the metrics that PromMonitor keeps track of
+// Describe describes all the metrics that PromMonitor keeps track of.
func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(p, descs)
}
@@ -142,7 +148,7 @@ func splitMethodName(fullMethodName string) (string, string) {
}
// NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use
-// with limiting pack objects processes
+// with limiting pack objects processes.
func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64) *PromMonitor {
acquiringSecondsVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
@@ -178,6 +184,7 @@ func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64)
).MustCurryWith(prometheus.Labels{"type": keyType})
return newPromMonitor(
+ TypePackObjects,
keyType,
queuedVec,
inProgressVec,
diff --git a/internal/middleware/limithandler/monitor_test.go b/internal/middleware/limithandler/monitor_test.go
index 2d9bc6c16..751332683 100644
--- a/internal/middleware/limithandler/monitor_test.go
+++ b/internal/middleware/limithandler/monitor_test.go
@@ -105,6 +105,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
stats := limitStatsFromContext(ctx)
require.NotNil(t, stats)
require.Equal(t, logrus.Fields{
+ "limit.limiting_type": TypePerRPC,
"limit.limiting_key": fullMethod,
"limit.concurrency_queue_ms": int64(1000),
"limit.concurrency_queue_length": 5,
@@ -160,6 +161,7 @@ gitaly_pack_objects_queued{type="user"} 1
stats := limitStatsFromContext(ctx)
require.NotNil(t, stats)
require.Equal(t, logrus.Fields{
+ "limit.limiting_type": TypePackObjects,
"limit.limiting_key": "1234",
"limit.concurrency_queue_ms": int64(1000),
"limit.concurrency_queue_length": 5,
diff --git a/internal/middleware/limithandler/stats.go b/internal/middleware/limithandler/stats.go
index 07985aeef..30926695a 100644
--- a/internal/middleware/limithandler/stats.go
+++ b/internal/middleware/limithandler/stats.go
@@ -16,6 +16,8 @@ type LimitStats struct {
sync.Mutex
// limitingKey is the key used for limiting accounting
limitingKey string
+ // limitingType is the type of limiter
+ limitingType string
// concurrencyQueueLen is the combination of in-flight requests and in-queue requests. It tells
// how busy the queue of the same limiting key is
concurrencyQueueLength int
@@ -38,10 +40,11 @@ func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64) {
}
// SetLimitingKey set limiting key.
-func (s *LimitStats) SetLimitingKey(limitingKey string) {
+func (s *LimitStats) SetLimitingKey(limitingType string, limitingKey string) {
s.Lock()
defer s.Unlock()
s.limitingKey = limitingKey
+ s.limitingType = limitingType
}
// SetConcurrencyQueueLength set concurrency queue length.
@@ -67,6 +70,7 @@ func (s *LimitStats) Fields() logrus.Fields {
return nil
}
logs := logrus.Fields{
+ "limit.limiting_type": s.limitingType,
"limit.limiting_key": s.limitingKey,
"limit.concurrency_queue_ms": s.concurrencyQueueMs,
"limit.concurrency_queue_length": s.concurrencyQueueLength,
diff --git a/internal/middleware/limithandler/stats_interceptor_test.go b/internal/middleware/limithandler/stats_interceptor_test.go
index 916bae1fc..7f6942fce 100644
--- a/internal/middleware/limithandler/stats_interceptor_test.go
+++ b/internal/middleware/limithandler/stats_interceptor_test.go
@@ -120,6 +120,7 @@ func TestInterceptor(t *testing.T) {
require.NoError(t, err)
},
expectedLogData: map[string]any{
+ "limit.limiting_type": "per-rpc",
"limit.limiting_key": "@hashed/1234",
"limit.concurrency_queue_length": 0,
},
@@ -141,6 +142,7 @@ func TestInterceptor(t *testing.T) {
}
},
expectedLogData: map[string]any{
+ "limit.limiting_type": "per-rpc",
"limit.limiting_key": "@hashed/1234",
"limit.concurrency_queue_length": 0,
},
diff --git a/internal/middleware/limithandler/stats_test.go b/internal/middleware/limithandler/stats_test.go
index 66e07ce08..a51add517 100644
--- a/internal/middleware/limithandler/stats_test.go
+++ b/internal/middleware/limithandler/stats_test.go
@@ -15,12 +15,13 @@ func TestLimitStats(t *testing.T) {
ctx = InitLimitStats(ctx)
stats := limitStatsFromContext(ctx)
- stats.SetLimitingKey("hello-world")
+ stats.SetLimitingKey("test-limiter", "hello-world")
stats.AddConcurrencyQueueMs(13)
stats.SetConcurrencyQueueLength(99)
stats.SetConcurrencyDroppedReason("max_time")
assert.Equal(t, FieldsProducer(ctx, nil), logrus.Fields{
+ "limit.limiting_type": "test-limiter",
"limit.limiting_key": "hello-world",
"limit.concurrency_queue_ms": int64(13),
"limit.concurrency_queue_length": 99,