Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-05-08 06:31:25 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-05-08 06:31:25 +0300
commitbf616bd24e576aa6792f1a9173ca68cc32d42319 (patch)
tree3483f39b2f8b9a9c776653015d5b9f99b32de0dc
parent91b69d050acf344c09a9238f24a75c4938001113 (diff)
parent88265de0ad86df9a7a65c9533fca36bf7c9056bd (diff)
Merge branch 'qmnguyen0711/move-concurrency-logs-to-grpc-logs' into 'master'
Improve concurrency limits and pack-objects logs See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5719 Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com> Approved-by: Pavlo Strokov <pstrokov@gitlab.com> Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Pavlo Strokov <pstrokov@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r--internal/gitaly/service/hook/pack_objects.go32
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go37
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go39
-rw-r--r--internal/middleware/limithandler/concurrency_limiter_test.go12
-rw-r--r--internal/middleware/limithandler/monitor.go61
-rw-r--r--internal/middleware/limithandler/monitor_test.go33
-rw-r--r--internal/middleware/limithandler/stats.go63
-rw-r--r--internal/middleware/limithandler/stats_interceptor_test.go34
-rw-r--r--internal/middleware/limithandler/stats_test.go11
9 files changed, 213 insertions, 109 deletions
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go
index 32b209f67..8d1ba124e 100644
--- a/internal/gitaly/service/hook/pack_objects.go
+++ b/internal/gitaly/service/hook/pack_objects.go
@@ -15,10 +15,9 @@ import (
"strings"
"syscall"
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
- "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/command"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline"
gitalyhook "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook"
@@ -129,10 +128,16 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
packObjectsCacheLookups.WithLabelValues("hit").Inc()
}
- ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
- "cache_key": cacheKey,
- "bytes": servedBytes,
- }).Info("served bytes")
+ stats := command.StatsFromContext(ctx)
+ if stats != nil {
+ stats.RecordMetadata("pack_objects_cache.key", cacheKey)
+ stats.RecordSum("pack_objects_cache.served_bytes", int(servedBytes))
+ if created {
+ stats.RecordMetadata("pack_objects_cache.hit", "false")
+ } else {
+ stats.RecordMetadata("pack_objects_cache.hit", "true")
+ }
+ }
packObjectsServedBytes.Add(float64(servedBytes))
return nil
@@ -269,14 +274,13 @@ func runPackObjects(
defer func() {
packObjectsGeneratedBytes.Add(float64(counter.N))
- logger := ctxlogrus.Extract(ctx)
- logger.WithFields(logrus.Fields{
- "cache_key": key,
- "bytes": counter.N,
- }).Info("generated bytes")
-
- if total := totalMessage(stderrBuf.Bytes()); total != "" {
- logger.WithField("pack.stat", total).Info("pack file compression statistic")
+ stats := command.StatsFromContext(ctx)
+ if stats != nil {
+ stats.RecordMetadata("pack_objects_cache.key", key)
+ stats.RecordSum("pack_objects_cache.generated_bytes", int(counter.N))
+ if total := totalMessage(stderrBuf.Bytes()); total != "" {
+ stats.RecordMetadata("pack_objects.compression_statistics", total)
+ }
}
}()
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 0545d50f7..4c0a5342d 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -15,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
- "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -546,34 +545,18 @@ func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, ctx co
"-C", repoPath, "index-pack", "--stdin", "--fix-thin",
)
- for _, msg := range []string{"served bytes", "generated bytes"} {
- t.Run(msg, func(t *testing.T) {
- var entry *logrus.Entry
- for _, e := range hook.AllEntries() {
- if e.Message == msg {
- entry = e
- }
- }
+ entry := hook.LastEntry()
+ require.NotNil(t, entry)
- require.NotNil(t, entry)
- require.NotEmpty(t, entry.Data["cache_key"])
- require.Greater(t, entry.Data["bytes"], int64(0))
- })
- }
+ fields := entry.Data
+ require.Equal(t, fields["pack_objects_cache.hit"], "false")
+ require.Contains(t, fields, "pack_objects_cache.key")
+ require.Greater(t, fields["pack_objects_cache.served_bytes"], 0)
+ require.Greater(t, fields["pack_objects_cache.generated_bytes"], 0)
- t.Run("pack file compression statistic", func(t *testing.T) {
- var entry *logrus.Entry
- for _, e := range hook.AllEntries() {
- if e.Message == "pack file compression statistic" {
- entry = e
- }
- }
-
- require.NotNil(t, entry)
- total := entry.Data["pack.stat"].(string)
- require.True(t, strings.HasPrefix(total, "Total "))
- require.False(t, strings.Contains(total, "\n"))
- })
+ total := fields["pack_objects.compression_statistics"].(string)
+ require.True(t, strings.HasPrefix(total, "Total "))
+ require.False(t, strings.Contains(total, "\n"))
expectedMetrics := `# HELP gitaly_pack_objects_concurrent_processes Number of concurrent processes
# TYPE gitaly_pack_objects_concurrent_processes histogram
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index d200ac8ae..086a2e66d 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -7,7 +7,6 @@ import (
"sync"
"time"
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
@@ -17,6 +16,15 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)
+const (
+ // TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All
+ // requests of the same method shares the concurrency limit.
+ TypePerRPC = "per-rpc"
+ // TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request
+ // information (RemoteIP/Repository/User) as the limiting key.
+ TypePackObjects = "pack-objects"
+)
+
// ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the
// concurrency queue.
var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")
@@ -43,7 +51,7 @@ type keyedConcurrencyLimiter struct {
// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max
// queue-time ticker ticks before acquiring a concurrency token.
-func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr error) {
+func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey string) (returnedErr error) {
if sem.queueTokens != nil {
// Try to acquire the queueing token. The queueing token is used to control how many
// callers may wait for the concurrency token at the same time. If there are no more
@@ -68,16 +76,16 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr er
}
}()
default:
- sem.monitor.Dropped(ctx, "max_size")
+ sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "max_size")
return ErrMaxQueueSize
}
}
// We are queued now, so let's tell the monitor. Furthermore, even though we're still
// holding the queueing token when this function exits successfully we also tell the monitor
- // that we have exited the queue. It is only an implemenation detail anyway that we hold on
+ // that we have exited the queue. It is only an implementation detail anyway that we hold on
// to the token, so the monitor shouldn't care about that.
- sem.monitor.Queued(ctx)
+ sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens))
defer sem.monitor.Dequeued(ctx)
// Set up the ticker that keeps us from waiting indefinitely on the concurrency token.
@@ -96,9 +104,10 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr er
case sem.concurrencyTokens <- struct{}{}:
return nil
case <-ticker.C():
- sem.monitor.Dropped(ctx, "max_time")
+ sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "max_time")
return ErrMaxQueueTime
case <-ctx.Done():
+ sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "other")
return ctx.Err()
}
}
@@ -176,7 +185,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
start := time.Now()
- if err := sem.acquire(ctx); err != nil {
+ if err := sem.acquire(ctx, limitingKey); err != nil {
switch err {
case ErrMaxQueueSize:
return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{
@@ -189,8 +198,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
RetryAfter: durationpb.New(0),
})
default:
- ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey).WithError(err).Error("unexpected error when dequeueing request")
- return nil, err
+ return nil, fmt.Errorf("unexpected error when dequeueing request: %w", err)
}
}
defer sem.release()
@@ -313,8 +321,10 @@ func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
limit.MaxPerRepo,
limit.MaxQueueSize,
newTickerFunc,
- newPerRPCPromMonitor("gitaly", limit.RPC, queuedMetric, inProgressMetric,
- acquiringSecondsMetric, middleware.requestsDroppedMetric),
+ newPerRPCPromMonitor(
+ "gitaly", limit.RPC,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
)
}
@@ -327,8 +337,11 @@ func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
func() helper.Ticker {
return helper.NewManualTicker()
},
- newPerRPCPromMonitor("gitaly", replicateRepositoryFullMethod, queuedMetric,
- inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric))
+ newPerRPCPromMonitor(
+ "gitaly", replicateRepositoryFullMethod,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
}
middleware.methodLimiters = result
diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go
index dfcdc1647..a4317d5a2 100644
--- a/internal/middleware/limithandler/concurrency_limiter_test.go
+++ b/internal/middleware/limithandler/concurrency_limiter_test.go
@@ -53,31 +53,31 @@ func (c *counter) currentVal() int {
return c.current
}
-func (c *counter) Queued(ctx context.Context) {
+func (c *counter) Queued(context.Context, string, int) {
c.Lock()
defer c.Unlock()
c.queued++
}
-func (c *counter) Dequeued(ctx context.Context) {
+func (c *counter) Dequeued(context.Context) {
c.Lock()
defer c.Unlock()
c.dequeued++
}
-func (c *counter) Enter(ctx context.Context, acquireTime time.Duration) {
+func (c *counter) Enter(context.Context, time.Duration) {
c.Lock()
defer c.Unlock()
c.enter++
}
-func (c *counter) Exit(ctx context.Context) {
+func (c *counter) Exit(context.Context) {
c.Lock()
defer c.Unlock()
c.exit++
}
-func (c *counter) Dropped(ctx context.Context, reason string) {
+func (c *counter) Dropped(_ context.Context, _ string, _ int, reason string) {
switch reason {
case "max_time":
c.droppedTime++
@@ -237,7 +237,7 @@ type blockingQueueCounter struct {
// Queued will block on a channel. We need a way to synchronize on when a Limiter has attempted to acquire
// a semaphore but has not yet. The caller can use the channel to wait for many requests to be queued
-func (b *blockingQueueCounter) Queued(_ context.Context) {
+func (b *blockingQueueCounter) Queued(context.Context, string, int) {
b.queuedCh <- struct{}{}
}
diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go
index 0c9ded13f..aa44d3e20 100644
--- a/internal/middleware/limithandler/monitor.go
+++ b/internal/middleware/limithandler/monitor.go
@@ -5,28 +5,25 @@ import (
"strings"
"time"
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
)
-const acquireDurationLogThreshold = 10 * time.Millisecond
-
-// ConcurrencyMonitor allows the concurrency monitor to be observed
+// ConcurrencyMonitor allows the concurrency monitor to be observed.
type ConcurrencyMonitor interface {
- Queued(ctx context.Context)
+ Queued(ctx context.Context, key string, length int)
Dequeued(ctx context.Context)
Enter(ctx context.Context, acquireTime time.Duration)
Exit(ctx context.Context)
- Dropped(ctx context.Context, message string)
+ Dropped(ctx context.Context, key string, length int, message string)
}
type noopConcurrencyMonitor struct{}
-func (c *noopConcurrencyMonitor) Queued(ctx context.Context) {}
-func (c *noopConcurrencyMonitor) Dequeued(ctx context.Context) {}
-func (c *noopConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {}
-func (c *noopConcurrencyMonitor) Exit(ctx context.Context) {}
-func (c *noopConcurrencyMonitor) Dropped(ctx context.Context, reason string) {}
+func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {}
+func (c *noopConcurrencyMonitor) Dequeued(context.Context) {}
+func (c *noopConcurrencyMonitor) Enter(context.Context, time.Duration) {}
+func (c *noopConcurrencyMonitor) Exit(context.Context) {}
+func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, string) {}
// NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor
func NewNoopConcurrencyMonitor() ConcurrencyMonitor {
@@ -35,8 +32,11 @@ func NewNoopConcurrencyMonitor() ConcurrencyMonitor {
// PromMonitor keeps track of prometheus metrics for limithandlers.
// It conforms to both the ConcurrencyMonitor, and prometheus.Collector
-// interfaces
+// interfaces.
type PromMonitor struct {
+ // limitingType stores the type of the limiter. There are two types at the moment: per-rpc
+ // and pack-objects.
+ limitingType string
queuedMetric prometheus.Gauge
inProgressMetric prometheus.Gauge
acquiringSecondsMetric prometheus.Observer
@@ -56,6 +56,7 @@ func newPerRPCPromMonitor(
serviceName, methodName := splitMethodName(fullMethod)
return &PromMonitor{
+ limitingType: TypePerRPC,
queuedMetric: queuedMetric.WithLabelValues(system, serviceName, methodName),
inProgressMetric: inProgressMetric.WithLabelValues(system, serviceName, methodName),
acquiringSecondsMetric: acquiringSecondsVec.WithLabelValues(system, serviceName, methodName),
@@ -68,25 +69,23 @@ func newPerRPCPromMonitor(
}
}
-// Queued is called when a request has been queued
-func (p *PromMonitor) Queued(ctx context.Context) {
+// Queued is called when a request has been queued.
+func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int) {
+ if stats := limitStatsFromContext(ctx); stats != nil {
+ stats.SetLimitingKey(p.limitingType, key)
+ stats.SetConcurrencyQueueLength(queueLength)
+ }
p.queuedMetric.Inc()
}
-// Dequeued is called when a request has been dequeued
+// Dequeued is called when a request has been dequeued.
func (p *PromMonitor) Dequeued(ctx context.Context) {
p.queuedMetric.Dec()
}
-// Enter is called when a request begins to be processed
+// Enter is called when a request begins to be processed.
func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
p.inProgressMetric.Inc()
-
- if acquireTime > acquireDurationLogThreshold {
- logger := ctxlogrus.Extract(ctx)
- logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait")
- }
-
p.acquiringSecondsMetric.Observe(acquireTime.Seconds())
if stats := limitStatsFromContext(ctx); stats != nil {
@@ -94,23 +93,30 @@ func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
}
}
-// Exit is called when a request has finished processing
+// Exit is called when a request has finished processing.
func (p *PromMonitor) Exit(ctx context.Context) {
p.inProgressMetric.Dec()
}
// Dropped is called when a request is dropped.
-func (p *PromMonitor) Dropped(ctx context.Context, reason string) {
+func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, reason string) {
+ if stats := limitStatsFromContext(ctx); stats != nil {
+ stats.SetLimitingKey(p.limitingType, key)
+ stats.SetConcurrencyQueueLength(length)
+ stats.SetConcurrencyDroppedReason(reason)
+ }
p.requestsDroppedMetric.WithLabelValues(reason).Inc()
}
func newPromMonitor(
+ limitingType string,
keyType string,
queuedVec, inProgressVec *prometheus.GaugeVec,
acquiringSecondsVec *prometheus.HistogramVec,
requestsDroppedVec *prometheus.CounterVec,
) *PromMonitor {
return &PromMonitor{
+ limitingType: limitingType,
queuedMetric: queuedVec.WithLabelValues(keyType),
inProgressMetric: inProgressVec.WithLabelValues(keyType),
acquiringSecondsMetric: acquiringSecondsVec.WithLabelValues(keyType),
@@ -119,7 +125,7 @@ func newPromMonitor(
}
}
-// Collect collects all the metrics that PromMonitor keeps track of
+// Collect collects all the metrics that PromMonitor keeps track of.
func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric) {
p.queuedMetric.Collect(metrics)
p.inProgressMetric.Collect(metrics)
@@ -127,7 +133,7 @@ func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric) {
p.requestsDroppedMetric.Collect(metrics)
}
-// Describe describes all the metrics that PromMonitor keeps track of
+// Describe describes all the metrics that PromMonitor keeps track of.
func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(p, descs)
}
@@ -142,7 +148,7 @@ func splitMethodName(fullMethodName string) (string, string) {
}
// NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use
-// with limiting pack objects processes
+// with limiting pack objects processes.
func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64) *PromMonitor {
acquiringSecondsVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
@@ -178,6 +184,7 @@ func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64)
).MustCurryWith(prometheus.Labels{"type": keyType})
return newPromMonitor(
+ TypePackObjects,
keyType,
queuedVec,
inProgressVec,
diff --git a/internal/middleware/limithandler/monitor_test.go b/internal/middleware/limithandler/monitor_test.go
index ca555da0f..751332683 100644
--- a/internal/middleware/limithandler/monitor_test.go
+++ b/internal/middleware/limithandler/monitor_test.go
@@ -7,6 +7,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
promconfig "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/prometheus"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
@@ -59,11 +60,11 @@ func TestNewPerRPCPromMonitor(t *testing.T) {
requestsDroppedMetric,
)
- ctx := testhelper.Context(t)
+ ctx := InitLimitStats(testhelper.Context(t))
- rpcMonitor.Queued(ctx)
+ rpcMonitor.Queued(ctx, fullMethod, 5)
rpcMonitor.Enter(ctx, time.Second)
- rpcMonitor.Dropped(ctx, "load")
+ rpcMonitor.Dropped(ctx, fullMethod, 5, "load")
expectedMetrics := `# HELP acquiring_seconds seconds to acquire
# TYPE acquiring_seconds histogram
@@ -100,19 +101,29 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
"dropped",
"acquiring_seconds",
))
+
+ stats := limitStatsFromContext(ctx)
+ require.NotNil(t, stats)
+ require.Equal(t, logrus.Fields{
+ "limit.limiting_type": TypePerRPC,
+ "limit.limiting_key": fullMethod,
+ "limit.concurrency_queue_ms": int64(1000),
+ "limit.concurrency_queue_length": 5,
+ "limit.concurrency_dropped": "load",
+ }, stats.Fields())
}
func TestNewPackObjectsConcurrencyMonitor(t *testing.T) {
- ctx := testhelper.Context(t)
+ ctx := InitLimitStats(testhelper.Context(t))
m := NewPackObjectsConcurrencyMonitor(
"user",
promconfig.DefaultConfig().GRPCLatencyBuckets,
)
- m.Queued(ctx)
+ m.Queued(ctx, "1234", 5)
m.Enter(ctx, time.Second)
- m.Dropped(ctx, "load")
+ m.Dropped(ctx, "1234", 5, "load")
expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds)
# TYPE gitaly_pack_objects_acquiring_seconds histogram
@@ -146,4 +157,14 @@ gitaly_pack_objects_queued{type="user"} 1
"gitaly_pack_objects_queued",
"gitaly_pack_objects_dropped_total",
))
+
+ stats := limitStatsFromContext(ctx)
+ require.NotNil(t, stats)
+ require.Equal(t, logrus.Fields{
+ "limit.limiting_type": TypePackObjects,
+ "limit.limiting_key": "1234",
+ "limit.concurrency_queue_ms": int64(1000),
+ "limit.concurrency_queue_length": 5,
+ "limit.concurrency_dropped": "load",
+ }, stats.Fields())
}
diff --git a/internal/middleware/limithandler/stats.go b/internal/middleware/limithandler/stats.go
index 5f77766fe..30926695a 100644
--- a/internal/middleware/limithandler/stats.go
+++ b/internal/middleware/limithandler/stats.go
@@ -2,7 +2,7 @@ package limithandler
import (
"context"
- "sync/atomic"
+ "sync"
grpcmw "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/sirupsen/logrus"
@@ -13,25 +13,72 @@ type limitStatsKey struct{}
// LimitStats contains info about the concurrency limiter.
type LimitStats struct {
+ sync.Mutex
+ // limitingKey is the key used for limiting accounting
+ limitingKey string
+ // limitingType is the type of limiter
+ limitingType string
+ // concurrencyQueueLen is the combination of in-flight requests and in-queue requests. It tells
+ // how busy the queue of the same limiting key is
+ concurrencyQueueLength int
// concurrencyQueueMs milliseconds waiting in concurrency limit queue.
concurrencyQueueMs int64
+ // concurrencyDropped stores the dropping reason of a request
+ concurrencyDropped string
}
-// InitLimitStats initializes context with a per-RPC stats struct
+// InitLimitStats initializes context with a per-RPC stats struct.
func InitLimitStats(ctx context.Context) context.Context {
return context.WithValue(ctx, limitStatsKey{}, &LimitStats{})
}
// AddConcurrencyQueueMs adds queue time.
func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64) {
- atomic.AddInt64(&s.concurrencyQueueMs, queueMs)
+ s.Lock()
+ defer s.Unlock()
+ s.concurrencyQueueMs = queueMs
+}
+
+// SetLimitingKey set limiting key.
+func (s *LimitStats) SetLimitingKey(limitingType string, limitingKey string) {
+ s.Lock()
+ defer s.Unlock()
+ s.limitingKey = limitingKey
+ s.limitingType = limitingType
+}
+
+// SetConcurrencyQueueLength set concurrency queue length.
+func (s *LimitStats) SetConcurrencyQueueLength(queueLength int) {
+ s.Lock()
+ defer s.Unlock()
+ s.concurrencyQueueLength = queueLength
+}
+
+// SetConcurrencyDroppedReason sets the reason why a call has been dropped from the queue.
+func (s *LimitStats) SetConcurrencyDroppedReason(reason string) {
+ s.Lock()
+ defer s.Unlock()
+ s.concurrencyDropped = reason
}
// Fields returns logging info.
func (s *LimitStats) Fields() logrus.Fields {
- return logrus.Fields{
- "limit.concurrency_queue_ms": s.concurrencyQueueMs,
+ s.Lock()
+ defer s.Unlock()
+
+ if s.limitingKey == "" {
+ return nil
+ }
+ logs := logrus.Fields{
+ "limit.limiting_type": s.limitingType,
+ "limit.limiting_key": s.limitingKey,
+ "limit.concurrency_queue_ms": s.concurrencyQueueMs,
+ "limit.concurrency_queue_length": s.concurrencyQueueLength,
+ }
+ if s.concurrencyDropped != "" {
+ logs["limit.concurrency_dropped"] = s.concurrencyDropped
}
+ return logs
}
// FieldsProducer extracts stats info from the context and returns it as a logging fields.
@@ -53,9 +100,9 @@ func limitStatsFromContext(ctx context.Context) *LimitStats {
// stats interceptors are separate from middleware and serve one main purpose:
// initialize the stats object early, so that logrus can see it.
-// it must be placed before the limithandler middleware
+// it must be placed before the limithandler middleware.
-// StatsUnaryInterceptor returns a Unary Interceptor that initializes the context
+// StatsUnaryInterceptor returns a Unary Interceptor that initializes the context.
func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx = InitLimitStats(ctx)
@@ -64,7 +111,7 @@ func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.Unar
return res, err
}
-// StatsStreamInterceptor returns a Stream Interceptor
+// StatsStreamInterceptor returns a Stream Interceptor.
func StatsStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := stream.Context()
ctx = InitLimitStats(ctx)
diff --git a/internal/middleware/limithandler/stats_interceptor_test.go b/internal/middleware/limithandler/stats_interceptor_test.go
index 5dc641100..7f6942fce 100644
--- a/internal/middleware/limithandler/stats_interceptor_test.go
+++ b/internal/middleware/limithandler/stats_interceptor_test.go
@@ -33,7 +33,7 @@ func createNewServer(t *testing.T, cfg config.Cfg, logger *logrus.Logger) *grpc.
concurrencyLimitHandler := limithandler.New(
cfg,
- limithandler.LimitConcurrencyByRepo,
+ func(ctx context.Context) string { return "@hashed/1234" },
limithandler.WithConcurrencyLimiters,
)
@@ -80,7 +80,18 @@ func TestInterceptor(t *testing.T) {
t.Parallel()
ctx := testhelper.Context(t)
- cfg := testcfg.Build(t)
+ cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{
+ Concurrency: []config.Concurrency{
+ {
+ RPC: "/gitaly.RefService/RefExists",
+ MaxPerRepo: 1,
+ },
+ {
+ RPC: "/gitaly.RefService/ListRefs",
+ MaxPerRepo: 1,
+ },
+ },
+ }))
repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
SkipCreationViaService: true,
@@ -98,7 +109,7 @@ func TestInterceptor(t *testing.T) {
tests := []struct {
name string
performRPC func(t *testing.T, ctx context.Context, client gitalypb.RefServiceClient)
- expectedLogData []string
+ expectedLogData map[string]any
}{
{
name: "Unary",
@@ -108,7 +119,11 @@ func TestInterceptor(t *testing.T) {
_, err := client.RefExists(ctx, req)
require.NoError(t, err)
},
- expectedLogData: []string{"limit.concurrency_queue_ms"},
+ expectedLogData: map[string]any{
+ "limit.limiting_type": "per-rpc",
+ "limit.limiting_key": "@hashed/1234",
+ "limit.concurrency_queue_length": 0,
+ },
},
{
name: "Stream",
@@ -126,7 +141,11 @@ func TestInterceptor(t *testing.T) {
require.NoError(t, err)
}
},
- expectedLogData: []string{"limit.concurrency_queue_ms"},
+ expectedLogData: map[string]any{
+ "limit.limiting_type": "per-rpc",
+ "limit.limiting_key": "@hashed/1234",
+ "limit.concurrency_queue_length": 0,
+ },
},
}
for _, tt := range tests {
@@ -143,9 +162,10 @@ func TestInterceptor(t *testing.T) {
logEntries := hook.AllEntries()
require.Len(t, logEntries, 1)
- for _, expectedLogKey := range tt.expectedLogData {
- require.Contains(t, logEntries[0].Data, expectedLogKey)
+ for expectedLogKey, expectedLogValue := range tt.expectedLogData {
+ require.Equal(t, expectedLogValue, logEntries[0].Data[expectedLogKey])
}
+ require.GreaterOrEqual(t, logEntries[0].Data["limit.concurrency_queue_ms"], int64(0))
})
}
}
diff --git a/internal/middleware/limithandler/stats_test.go b/internal/middleware/limithandler/stats_test.go
index 1118164ac..a51add517 100644
--- a/internal/middleware/limithandler/stats_test.go
+++ b/internal/middleware/limithandler/stats_test.go
@@ -15,7 +15,16 @@ func TestLimitStats(t *testing.T) {
ctx = InitLimitStats(ctx)
stats := limitStatsFromContext(ctx)
+ stats.SetLimitingKey("test-limiter", "hello-world")
stats.AddConcurrencyQueueMs(13)
+ stats.SetConcurrencyQueueLength(99)
+ stats.SetConcurrencyDroppedReason("max_time")
- assert.Equal(t, FieldsProducer(ctx, nil), logrus.Fields{"limit.concurrency_queue_ms": int64(13)})
+ assert.Equal(t, FieldsProducer(ctx, nil), logrus.Fields{
+ "limit.limiting_type": "test-limiter",
+ "limit.limiting_key": "hello-world",
+ "limit.concurrency_queue_ms": int64(13),
+ "limit.concurrency_queue_length": 99,
+ "limit.concurrency_dropped": "max_time",
+ })
}