diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-05-08 06:31:25 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-05-08 06:31:25 +0300 |
commit | bf616bd24e576aa6792f1a9173ca68cc32d42319 (patch) | |
tree | 3483f39b2f8b9a9c776653015d5b9f99b32de0dc | |
parent | 91b69d050acf344c09a9238f24a75c4938001113 (diff) | |
parent | 88265de0ad86df9a7a65c9533fca36bf7c9056bd (diff) |
Merge branch 'qmnguyen0711/move-concurrency-logs-to-grpc-logs' into 'master'
Improve concurrency limits and pack-objects logs
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5719
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Approved-by: Pavlo Strokov <pstrokov@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Pavlo Strokov <pstrokov@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r-- | internal/gitaly/service/hook/pack_objects.go | 32 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 37 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 39 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter_test.go | 12 | ||||
-rw-r--r-- | internal/middleware/limithandler/monitor.go | 61 | ||||
-rw-r--r-- | internal/middleware/limithandler/monitor_test.go | 33 | ||||
-rw-r--r-- | internal/middleware/limithandler/stats.go | 63 | ||||
-rw-r--r-- | internal/middleware/limithandler/stats_interceptor_test.go | 34 | ||||
-rw-r--r-- | internal/middleware/limithandler/stats_test.go | 11 |
9 files changed, 213 insertions, 109 deletions
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 32b209f67..8d1ba124e 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -15,10 +15,9 @@ import ( "strings" "syscall" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v15/internal/command" "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline" gitalyhook "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook" @@ -129,10 +128,16 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH packObjectsCacheLookups.WithLabelValues("hit").Inc() } - ctxlogrus.Extract(ctx).WithFields(logrus.Fields{ - "cache_key": cacheKey, - "bytes": servedBytes, - }).Info("served bytes") + stats := command.StatsFromContext(ctx) + if stats != nil { + stats.RecordMetadata("pack_objects_cache.key", cacheKey) + stats.RecordSum("pack_objects_cache.served_bytes", int(servedBytes)) + if created { + stats.RecordMetadata("pack_objects_cache.hit", "false") + } else { + stats.RecordMetadata("pack_objects_cache.hit", "true") + } + } packObjectsServedBytes.Add(float64(servedBytes)) return nil @@ -269,14 +274,13 @@ func runPackObjects( defer func() { packObjectsGeneratedBytes.Add(float64(counter.N)) - logger := ctxlogrus.Extract(ctx) - logger.WithFields(logrus.Fields{ - "cache_key": key, - "bytes": counter.N, - }).Info("generated bytes") - - if total := totalMessage(stderrBuf.Bytes()); total != "" { - logger.WithField("pack.stat", total).Info("pack file compression statistic") + stats := command.StatsFromContext(ctx) + if stats != nil { + stats.RecordMetadata("pack_objects_cache.key", key) + stats.RecordSum("pack_objects_cache.generated_bytes", int(counter.N)) + if total := totalMessage(stderrBuf.Bytes()); total != "" { + stats.RecordMetadata("pack_objects.compression_statistics", total) + } } }() diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 0545d50f7..4c0a5342d 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -15,7 +15,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -546,34 +545,18 @@ func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, ctx co "-C", repoPath, "index-pack", "--stdin", "--fix-thin", ) - for _, msg := range []string{"served bytes", "generated bytes"} { - t.Run(msg, func(t *testing.T) { - var entry *logrus.Entry - for _, e := range hook.AllEntries() { - if e.Message == msg { - entry = e - } - } + entry := hook.LastEntry() + require.NotNil(t, entry) - require.NotNil(t, entry) - require.NotEmpty(t, entry.Data["cache_key"]) - require.Greater(t, entry.Data["bytes"], int64(0)) - }) - } + fields := entry.Data + require.Equal(t, fields["pack_objects_cache.hit"], "false") + require.Contains(t, fields, "pack_objects_cache.key") + require.Greater(t, fields["pack_objects_cache.served_bytes"], 0) + require.Greater(t, fields["pack_objects_cache.generated_bytes"], 0) - t.Run("pack file compression statistic", func(t *testing.T) { - var entry *logrus.Entry - for _, e := range hook.AllEntries() { - if e.Message == "pack file compression statistic" { - entry = e - } - } - - require.NotNil(t, entry) - total := entry.Data["pack.stat"].(string) - require.True(t, strings.HasPrefix(total, "Total ")) - require.False(t, strings.Contains(total, "\n")) - }) + total := fields["pack_objects.compression_statistics"].(string) + require.True(t, strings.HasPrefix(total, "Total ")) + require.False(t, strings.Contains(total, "\n")) expectedMetrics := `# HELP gitaly_pack_objects_concurrent_processes Number of concurrent processes # TYPE gitaly_pack_objects_concurrent_processes histogram diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index d200ac8ae..086a2e66d 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" @@ -17,6 +16,15 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ) +const ( + // TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All + // requests of the same method shares the concurrency limit. + TypePerRPC = "per-rpc" + // TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request + // information (RemoteIP/Repository/User) as the limiting key. + TypePackObjects = "pack-objects" +) + // ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the // concurrency queue. var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached") @@ -43,7 +51,7 @@ type keyedConcurrencyLimiter struct { // acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max // queue-time ticker ticks before acquiring a concurrency token. -func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr error) { +func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey string) (returnedErr error) { if sem.queueTokens != nil { // Try to acquire the queueing token. The queueing token is used to control how many // callers may wait for the concurrency token at the same time. If there are no more @@ -68,16 +76,16 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr er } }() default: - sem.monitor.Dropped(ctx, "max_size") + sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "max_size") return ErrMaxQueueSize } } // We are queued now, so let's tell the monitor. Furthermore, even though we're still // holding the queueing token when this function exits successfully we also tell the monitor - // that we have exited the queue. It is only an implemenation detail anyway that we hold on + // that we have exited the queue. It is only an implementation detail anyway that we hold on // to the token, so the monitor shouldn't care about that. - sem.monitor.Queued(ctx) + sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens)) defer sem.monitor.Dequeued(ctx) // Set up the ticker that keeps us from waiting indefinitely on the concurrency token. @@ -96,9 +104,10 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr er case sem.concurrencyTokens <- struct{}{}: return nil case <-ticker.C(): - sem.monitor.Dropped(ctx, "max_time") + sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "max_time") return ErrMaxQueueTime case <-ctx.Done(): + sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "other") return ctx.Err() } } @@ -176,7 +185,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li start := time.Now() - if err := sem.acquire(ctx); err != nil { + if err := sem.acquire(ctx, limitingKey); err != nil { switch err { case ErrMaxQueueSize: return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{ @@ -189,8 +198,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li RetryAfter: durationpb.New(0), }) default: - ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey).WithError(err).Error("unexpected error when dequeueing request") - return nil, err + return nil, fmt.Errorf("unexpected error when dequeueing request: %w", err) } } defer sem.release() @@ -313,8 +321,10 @@ func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { limit.MaxPerRepo, limit.MaxQueueSize, newTickerFunc, - newPerRPCPromMonitor("gitaly", limit.RPC, queuedMetric, inProgressMetric, - acquiringSecondsMetric, middleware.requestsDroppedMetric), + newPerRPCPromMonitor( + "gitaly", limit.RPC, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), ) } @@ -327,8 +337,11 @@ func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { func() helper.Ticker { return helper.NewManualTicker() }, - newPerRPCPromMonitor("gitaly", replicateRepositoryFullMethod, queuedMetric, - inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric)) + newPerRPCPromMonitor( + "gitaly", replicateRepositoryFullMethod, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) } middleware.methodLimiters = result diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index dfcdc1647..a4317d5a2 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -53,31 +53,31 @@ func (c *counter) currentVal() int { return c.current } -func (c *counter) Queued(ctx context.Context) { +func (c *counter) Queued(context.Context, string, int) { c.Lock() defer c.Unlock() c.queued++ } -func (c *counter) Dequeued(ctx context.Context) { +func (c *counter) Dequeued(context.Context) { c.Lock() defer c.Unlock() c.dequeued++ } -func (c *counter) Enter(ctx context.Context, acquireTime time.Duration) { +func (c *counter) Enter(context.Context, time.Duration) { c.Lock() defer c.Unlock() c.enter++ } -func (c *counter) Exit(ctx context.Context) { +func (c *counter) Exit(context.Context) { c.Lock() defer c.Unlock() c.exit++ } -func (c *counter) Dropped(ctx context.Context, reason string) { +func (c *counter) Dropped(_ context.Context, _ string, _ int, reason string) { switch reason { case "max_time": c.droppedTime++ @@ -237,7 +237,7 @@ type blockingQueueCounter struct { // Queued will block on a channel. We need a way to synchronize on when a Limiter has attempted to acquire // a semaphore but has not yet. The caller can use the channel to wait for many requests to be queued -func (b *blockingQueueCounter) Queued(_ context.Context) { +func (b *blockingQueueCounter) Queued(context.Context, string, int) { b.queuedCh <- struct{}{} } diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go index 0c9ded13f..aa44d3e20 100644 --- a/internal/middleware/limithandler/monitor.go +++ b/internal/middleware/limithandler/monitor.go @@ -5,28 +5,25 @@ import ( "strings" "time" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" ) -const acquireDurationLogThreshold = 10 * time.Millisecond - -// ConcurrencyMonitor allows the concurrency monitor to be observed +// ConcurrencyMonitor allows the concurrency monitor to be observed. type ConcurrencyMonitor interface { - Queued(ctx context.Context) + Queued(ctx context.Context, key string, length int) Dequeued(ctx context.Context) Enter(ctx context.Context, acquireTime time.Duration) Exit(ctx context.Context) - Dropped(ctx context.Context, message string) + Dropped(ctx context.Context, key string, length int, message string) } type noopConcurrencyMonitor struct{} -func (c *noopConcurrencyMonitor) Queued(ctx context.Context) {} -func (c *noopConcurrencyMonitor) Dequeued(ctx context.Context) {} -func (c *noopConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {} -func (c *noopConcurrencyMonitor) Exit(ctx context.Context) {} -func (c *noopConcurrencyMonitor) Dropped(ctx context.Context, reason string) {} +func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {} +func (c *noopConcurrencyMonitor) Dequeued(context.Context) {} +func (c *noopConcurrencyMonitor) Enter(context.Context, time.Duration) {} +func (c *noopConcurrencyMonitor) Exit(context.Context) {} +func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, string) {} // NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor func NewNoopConcurrencyMonitor() ConcurrencyMonitor { @@ -35,8 +32,11 @@ func NewNoopConcurrencyMonitor() ConcurrencyMonitor { // PromMonitor keeps track of prometheus metrics for limithandlers. // It conforms to both the ConcurrencyMonitor, and prometheus.Collector -// interfaces +// interfaces. type PromMonitor struct { + // limitingType stores the type of the limiter. There are two types at the moment: per-rpc + // and pack-objects. + limitingType string queuedMetric prometheus.Gauge inProgressMetric prometheus.Gauge acquiringSecondsMetric prometheus.Observer @@ -56,6 +56,7 @@ func newPerRPCPromMonitor( serviceName, methodName := splitMethodName(fullMethod) return &PromMonitor{ + limitingType: TypePerRPC, queuedMetric: queuedMetric.WithLabelValues(system, serviceName, methodName), inProgressMetric: inProgressMetric.WithLabelValues(system, serviceName, methodName), acquiringSecondsMetric: acquiringSecondsVec.WithLabelValues(system, serviceName, methodName), @@ -68,25 +69,23 @@ func newPerRPCPromMonitor( } } -// Queued is called when a request has been queued -func (p *PromMonitor) Queued(ctx context.Context) { +// Queued is called when a request has been queued. +func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int) { + if stats := limitStatsFromContext(ctx); stats != nil { + stats.SetLimitingKey(p.limitingType, key) + stats.SetConcurrencyQueueLength(queueLength) + } p.queuedMetric.Inc() } -// Dequeued is called when a request has been dequeued +// Dequeued is called when a request has been dequeued. func (p *PromMonitor) Dequeued(ctx context.Context) { p.queuedMetric.Dec() } -// Enter is called when a request begins to be processed +// Enter is called when a request begins to be processed. func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) { p.inProgressMetric.Inc() - - if acquireTime > acquireDurationLogThreshold { - logger := ctxlogrus.Extract(ctx) - logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait") - } - p.acquiringSecondsMetric.Observe(acquireTime.Seconds()) if stats := limitStatsFromContext(ctx); stats != nil { @@ -94,23 +93,30 @@ func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) { } } -// Exit is called when a request has finished processing +// Exit is called when a request has finished processing. func (p *PromMonitor) Exit(ctx context.Context) { p.inProgressMetric.Dec() } // Dropped is called when a request is dropped. -func (p *PromMonitor) Dropped(ctx context.Context, reason string) { +func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, reason string) { + if stats := limitStatsFromContext(ctx); stats != nil { + stats.SetLimitingKey(p.limitingType, key) + stats.SetConcurrencyQueueLength(length) + stats.SetConcurrencyDroppedReason(reason) + } p.requestsDroppedMetric.WithLabelValues(reason).Inc() } func newPromMonitor( + limitingType string, keyType string, queuedVec, inProgressVec *prometheus.GaugeVec, acquiringSecondsVec *prometheus.HistogramVec, requestsDroppedVec *prometheus.CounterVec, ) *PromMonitor { return &PromMonitor{ + limitingType: limitingType, queuedMetric: queuedVec.WithLabelValues(keyType), inProgressMetric: inProgressVec.WithLabelValues(keyType), acquiringSecondsMetric: acquiringSecondsVec.WithLabelValues(keyType), @@ -119,7 +125,7 @@ func newPromMonitor( } } -// Collect collects all the metrics that PromMonitor keeps track of +// Collect collects all the metrics that PromMonitor keeps track of. func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric) { p.queuedMetric.Collect(metrics) p.inProgressMetric.Collect(metrics) @@ -127,7 +133,7 @@ func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric) { p.requestsDroppedMetric.Collect(metrics) } -// Describe describes all the metrics that PromMonitor keeps track of +// Describe describes all the metrics that PromMonitor keeps track of. func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc) { prometheus.DescribeByCollect(p, descs) } @@ -142,7 +148,7 @@ func splitMethodName(fullMethodName string) (string, string) { } // NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use -// with limiting pack objects processes +// with limiting pack objects processes. func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64) *PromMonitor { acquiringSecondsVec := prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -178,6 +184,7 @@ func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64) ).MustCurryWith(prometheus.Labels{"type": keyType}) return newPromMonitor( + TypePackObjects, keyType, queuedVec, inProgressVec, diff --git a/internal/middleware/limithandler/monitor_test.go b/internal/middleware/limithandler/monitor_test.go index ca555da0f..751332683 100644 --- a/internal/middleware/limithandler/monitor_test.go +++ b/internal/middleware/limithandler/monitor_test.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" promconfig "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/prometheus" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" @@ -59,11 +60,11 @@ func TestNewPerRPCPromMonitor(t *testing.T) { requestsDroppedMetric, ) - ctx := testhelper.Context(t) + ctx := InitLimitStats(testhelper.Context(t)) - rpcMonitor.Queued(ctx) + rpcMonitor.Queued(ctx, fullMethod, 5) rpcMonitor.Enter(ctx, time.Second) - rpcMonitor.Dropped(ctx, "load") + rpcMonitor.Dropped(ctx, fullMethod, 5, "load") expectedMetrics := `# HELP acquiring_seconds seconds to acquire # TYPE acquiring_seconds histogram @@ -100,19 +101,29 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 "dropped", "acquiring_seconds", )) + + stats := limitStatsFromContext(ctx) + require.NotNil(t, stats) + require.Equal(t, logrus.Fields{ + "limit.limiting_type": TypePerRPC, + "limit.limiting_key": fullMethod, + "limit.concurrency_queue_ms": int64(1000), + "limit.concurrency_queue_length": 5, + "limit.concurrency_dropped": "load", + }, stats.Fields()) } func TestNewPackObjectsConcurrencyMonitor(t *testing.T) { - ctx := testhelper.Context(t) + ctx := InitLimitStats(testhelper.Context(t)) m := NewPackObjectsConcurrencyMonitor( "user", promconfig.DefaultConfig().GRPCLatencyBuckets, ) - m.Queued(ctx) + m.Queued(ctx, "1234", 5) m.Enter(ctx, time.Second) - m.Dropped(ctx, "load") + m.Dropped(ctx, "1234", 5, "load") expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds) # TYPE gitaly_pack_objects_acquiring_seconds histogram @@ -146,4 +157,14 @@ gitaly_pack_objects_queued{type="user"} 1 "gitaly_pack_objects_queued", "gitaly_pack_objects_dropped_total", )) + + stats := limitStatsFromContext(ctx) + require.NotNil(t, stats) + require.Equal(t, logrus.Fields{ + "limit.limiting_type": TypePackObjects, + "limit.limiting_key": "1234", + "limit.concurrency_queue_ms": int64(1000), + "limit.concurrency_queue_length": 5, + "limit.concurrency_dropped": "load", + }, stats.Fields()) } diff --git a/internal/middleware/limithandler/stats.go b/internal/middleware/limithandler/stats.go index 5f77766fe..30926695a 100644 --- a/internal/middleware/limithandler/stats.go +++ b/internal/middleware/limithandler/stats.go @@ -2,7 +2,7 @@ package limithandler import ( "context" - "sync/atomic" + "sync" grpcmw "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/sirupsen/logrus" @@ -13,25 +13,72 @@ type limitStatsKey struct{} // LimitStats contains info about the concurrency limiter. type LimitStats struct { + sync.Mutex + // limitingKey is the key used for limiting accounting + limitingKey string + // limitingType is the type of limiter + limitingType string + // concurrencyQueueLen is the combination of in-flight requests and in-queue requests. It tells + // how busy the queue of the same limiting key is + concurrencyQueueLength int // concurrencyQueueMs milliseconds waiting in concurrency limit queue. concurrencyQueueMs int64 + // concurrencyDropped stores the dropping reason of a request + concurrencyDropped string } -// InitLimitStats initializes context with a per-RPC stats struct +// InitLimitStats initializes context with a per-RPC stats struct. func InitLimitStats(ctx context.Context) context.Context { return context.WithValue(ctx, limitStatsKey{}, &LimitStats{}) } // AddConcurrencyQueueMs adds queue time. func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64) { - atomic.AddInt64(&s.concurrencyQueueMs, queueMs) + s.Lock() + defer s.Unlock() + s.concurrencyQueueMs = queueMs +} + +// SetLimitingKey set limiting key. +func (s *LimitStats) SetLimitingKey(limitingType string, limitingKey string) { + s.Lock() + defer s.Unlock() + s.limitingKey = limitingKey + s.limitingType = limitingType +} + +// SetConcurrencyQueueLength set concurrency queue length. +func (s *LimitStats) SetConcurrencyQueueLength(queueLength int) { + s.Lock() + defer s.Unlock() + s.concurrencyQueueLength = queueLength +} + +// SetConcurrencyDroppedReason sets the reason why a call has been dropped from the queue. +func (s *LimitStats) SetConcurrencyDroppedReason(reason string) { + s.Lock() + defer s.Unlock() + s.concurrencyDropped = reason } // Fields returns logging info. func (s *LimitStats) Fields() logrus.Fields { - return logrus.Fields{ - "limit.concurrency_queue_ms": s.concurrencyQueueMs, + s.Lock() + defer s.Unlock() + + if s.limitingKey == "" { + return nil + } + logs := logrus.Fields{ + "limit.limiting_type": s.limitingType, + "limit.limiting_key": s.limitingKey, + "limit.concurrency_queue_ms": s.concurrencyQueueMs, + "limit.concurrency_queue_length": s.concurrencyQueueLength, + } + if s.concurrencyDropped != "" { + logs["limit.concurrency_dropped"] = s.concurrencyDropped } + return logs } // FieldsProducer extracts stats info from the context and returns it as a logging fields. @@ -53,9 +100,9 @@ func limitStatsFromContext(ctx context.Context) *LimitStats { // stats interceptors are separate from middleware and serve one main purpose: // initialize the stats object early, so that logrus can see it. -// it must be placed before the limithandler middleware +// it must be placed before the limithandler middleware. -// StatsUnaryInterceptor returns a Unary Interceptor that initializes the context +// StatsUnaryInterceptor returns a Unary Interceptor that initializes the context. func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { ctx = InitLimitStats(ctx) @@ -64,7 +111,7 @@ func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.Unar return res, err } -// StatsStreamInterceptor returns a Stream Interceptor +// StatsStreamInterceptor returns a Stream Interceptor. func StatsStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { ctx := stream.Context() ctx = InitLimitStats(ctx) diff --git a/internal/middleware/limithandler/stats_interceptor_test.go b/internal/middleware/limithandler/stats_interceptor_test.go index 5dc641100..7f6942fce 100644 --- a/internal/middleware/limithandler/stats_interceptor_test.go +++ b/internal/middleware/limithandler/stats_interceptor_test.go @@ -33,7 +33,7 @@ func createNewServer(t *testing.T, cfg config.Cfg, logger *logrus.Logger) *grpc. concurrencyLimitHandler := limithandler.New( cfg, - limithandler.LimitConcurrencyByRepo, + func(ctx context.Context) string { return "@hashed/1234" }, limithandler.WithConcurrencyLimiters, ) @@ -80,7 +80,18 @@ func TestInterceptor(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) - cfg := testcfg.Build(t) + cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{ + Concurrency: []config.Concurrency{ + { + RPC: "/gitaly.RefService/RefExists", + MaxPerRepo: 1, + }, + { + RPC: "/gitaly.RefService/ListRefs", + MaxPerRepo: 1, + }, + }, + })) repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ SkipCreationViaService: true, @@ -98,7 +109,7 @@ func TestInterceptor(t *testing.T) { tests := []struct { name string performRPC func(t *testing.T, ctx context.Context, client gitalypb.RefServiceClient) - expectedLogData []string + expectedLogData map[string]any }{ { name: "Unary", @@ -108,7 +119,11 @@ func TestInterceptor(t *testing.T) { _, err := client.RefExists(ctx, req) require.NoError(t, err) }, - expectedLogData: []string{"limit.concurrency_queue_ms"}, + expectedLogData: map[string]any{ + "limit.limiting_type": "per-rpc", + "limit.limiting_key": "@hashed/1234", + "limit.concurrency_queue_length": 0, + }, }, { name: "Stream", @@ -126,7 +141,11 @@ func TestInterceptor(t *testing.T) { require.NoError(t, err) } }, - expectedLogData: []string{"limit.concurrency_queue_ms"}, + expectedLogData: map[string]any{ + "limit.limiting_type": "per-rpc", + "limit.limiting_key": "@hashed/1234", + "limit.concurrency_queue_length": 0, + }, }, } for _, tt := range tests { @@ -143,9 +162,10 @@ func TestInterceptor(t *testing.T) { logEntries := hook.AllEntries() require.Len(t, logEntries, 1) - for _, expectedLogKey := range tt.expectedLogData { - require.Contains(t, logEntries[0].Data, expectedLogKey) + for expectedLogKey, expectedLogValue := range tt.expectedLogData { + require.Equal(t, expectedLogValue, logEntries[0].Data[expectedLogKey]) } + require.GreaterOrEqual(t, logEntries[0].Data["limit.concurrency_queue_ms"], int64(0)) }) } } diff --git a/internal/middleware/limithandler/stats_test.go b/internal/middleware/limithandler/stats_test.go index 1118164ac..a51add517 100644 --- a/internal/middleware/limithandler/stats_test.go +++ b/internal/middleware/limithandler/stats_test.go @@ -15,7 +15,16 @@ func TestLimitStats(t *testing.T) { ctx = InitLimitStats(ctx) stats := limitStatsFromContext(ctx) + stats.SetLimitingKey("test-limiter", "hello-world") stats.AddConcurrencyQueueMs(13) + stats.SetConcurrencyQueueLength(99) + stats.SetConcurrencyDroppedReason("max_time") - assert.Equal(t, FieldsProducer(ctx, nil), logrus.Fields{"limit.concurrency_queue_ms": int64(13)}) + assert.Equal(t, FieldsProducer(ctx, nil), logrus.Fields{ + "limit.limiting_type": "test-limiter", + "limit.limiting_key": "hello-world", + "limit.concurrency_queue_ms": int64(13), + "limit.concurrency_queue_length": 99, + "limit.concurrency_dropped": "max_time", + }) } |