Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-05-03 08:34:36 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-05-05 06:52:25 +0300
commit57d660d46e312304bb49a1e02aab907e1ff1cf20 (patch)
tree7b281633ea673a333961f9425f544ebcd81d6050
parent599c446f5ccfd6174cd9a8faae8aed1afee63d32 (diff)
Improve concurrency limiting logs
We employ multiple levels of concurrency limitation and it is crucial to have observability to comprehend how requests are being limited. Although Gitaly has some useful logs, particularly about queuing states, they are not integrated with the gRPC logs, making it difficult to understand the situation. To address this, we have unified all concurrency-limiting logs in one location through this commit. Furthermore, we have added new logging concurrent-limiting fields to enhance the logging capabilities.
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go17
-rw-r--r--internal/middleware/limithandler/concurrency_limiter_test.go12
-rw-r--r--internal/middleware/limithandler/monitor.go36
-rw-r--r--internal/middleware/limithandler/monitor_test.go31
-rw-r--r--internal/middleware/limithandler/stats.go59
-rw-r--r--internal/middleware/limithandler/stats_interceptor_test.go32
-rw-r--r--internal/middleware/limithandler/stats_test.go10
7 files changed, 142 insertions, 55 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index d200ac8ae..9bae47daa 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -7,7 +7,6 @@ import (
"sync"
"time"
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
@@ -43,7 +42,7 @@ type keyedConcurrencyLimiter struct {
// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max
// queue-time ticker ticks before acquiring a concurrency token.
-func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr error) {
+func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey string) (returnedErr error) {
if sem.queueTokens != nil {
// Try to acquire the queueing token. The queueing token is used to control how many
// callers may wait for the concurrency token at the same time. If there are no more
@@ -68,16 +67,16 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr er
}
}()
default:
- sem.monitor.Dropped(ctx, "max_size")
+ sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "max_size")
return ErrMaxQueueSize
}
}
// We are queued now, so let's tell the monitor. Furthermore, even though we're still
// holding the queueing token when this function exits successfully we also tell the monitor
- // that we have exited the queue. It is only an implemenation detail anyway that we hold on
+ // that we have exited the queue. It is only an implementation detail anyway that we hold on
// to the token, so the monitor shouldn't care about that.
- sem.monitor.Queued(ctx)
+ sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens))
defer sem.monitor.Dequeued(ctx)
// Set up the ticker that keeps us from waiting indefinitely on the concurrency token.
@@ -96,9 +95,10 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr er
case sem.concurrencyTokens <- struct{}{}:
return nil
case <-ticker.C():
- sem.monitor.Dropped(ctx, "max_time")
+ sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "max_time")
return ErrMaxQueueTime
case <-ctx.Done():
+ sem.monitor.Dropped(ctx, limitingKey, len(sem.queueTokens), "other")
return ctx.Err()
}
}
@@ -176,7 +176,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
start := time.Now()
- if err := sem.acquire(ctx); err != nil {
+ if err := sem.acquire(ctx, limitingKey); err != nil {
switch err {
case ErrMaxQueueSize:
return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{
@@ -189,8 +189,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
RetryAfter: durationpb.New(0),
})
default:
- ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey).WithError(err).Error("unexpected error when dequeueing request")
- return nil, err
+ return nil, fmt.Errorf("unexpected error when dequeueing request: %w", err)
}
}
defer sem.release()
diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go
index dfcdc1647..bf80d2b81 100644
--- a/internal/middleware/limithandler/concurrency_limiter_test.go
+++ b/internal/middleware/limithandler/concurrency_limiter_test.go
@@ -53,31 +53,31 @@ func (c *counter) currentVal() int {
return c.current
}
-func (c *counter) Queued(ctx context.Context) {
+func (c *counter) Queued(context.Context, string, int) {
c.Lock()
defer c.Unlock()
c.queued++
}
-func (c *counter) Dequeued(ctx context.Context) {
+func (c *counter) Dequeued(context.Context) {
c.Lock()
defer c.Unlock()
c.dequeued++
}
-func (c *counter) Enter(ctx context.Context, acquireTime time.Duration) {
+func (c *counter) Enter(context.Context, time.Duration) {
c.Lock()
defer c.Unlock()
c.enter++
}
-func (c *counter) Exit(ctx context.Context) {
+func (c *counter) Exit(context.Context) {
c.Lock()
defer c.Unlock()
c.exit++
}
-func (c *counter) Dropped(ctx context.Context, reason string) {
+func (c *counter) Dropped(_ context.Context, _ string, _ int, reason string) {
switch reason {
case "max_time":
c.droppedTime++
@@ -237,7 +237,7 @@ type blockingQueueCounter struct {
// Queued will block on a channel. We need a way to synchronize on when a Limiter has attempted to acquire
// a semaphore but has not yet. The caller can use the channel to wait for many requests to be queued
-func (b *blockingQueueCounter) Queued(_ context.Context) {
+func (b *blockingQueueCounter) Queued(_ context.Context, _ string, _ int) {
b.queuedCh <- struct{}{}
}
diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go
index 0c9ded13f..0e0a68e59 100644
--- a/internal/middleware/limithandler/monitor.go
+++ b/internal/middleware/limithandler/monitor.go
@@ -5,28 +5,25 @@ import (
"strings"
"time"
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
)
-const acquireDurationLogThreshold = 10 * time.Millisecond
-
// ConcurrencyMonitor allows the concurrency monitor to be observed
type ConcurrencyMonitor interface {
- Queued(ctx context.Context)
+ Queued(ctx context.Context, key string, length int)
Dequeued(ctx context.Context)
Enter(ctx context.Context, acquireTime time.Duration)
Exit(ctx context.Context)
- Dropped(ctx context.Context, message string)
+ Dropped(ctx context.Context, key string, length int, message string)
}
type noopConcurrencyMonitor struct{}
-func (c *noopConcurrencyMonitor) Queued(ctx context.Context) {}
-func (c *noopConcurrencyMonitor) Dequeued(ctx context.Context) {}
-func (c *noopConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {}
-func (c *noopConcurrencyMonitor) Exit(ctx context.Context) {}
-func (c *noopConcurrencyMonitor) Dropped(ctx context.Context, reason string) {}
+func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {}
+func (c *noopConcurrencyMonitor) Dequeued(context.Context) {}
+func (c *noopConcurrencyMonitor) Enter(context.Context, time.Duration) {}
+func (c *noopConcurrencyMonitor) Exit(context.Context) {}
+func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, string) {}
// NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor
func NewNoopConcurrencyMonitor() ConcurrencyMonitor {
@@ -69,7 +66,11 @@ func newPerRPCPromMonitor(
}
// Queued is called when a request has been queued
-func (p *PromMonitor) Queued(ctx context.Context) {
+func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int) {
+ if stats := limitStatsFromContext(ctx); stats != nil {
+ stats.SetLimitingKey(key)
+ stats.SetConcurrencyQueueLength(queueLength)
+ }
p.queuedMetric.Inc()
}
@@ -81,12 +82,6 @@ func (p *PromMonitor) Dequeued(ctx context.Context) {
// Enter is called when a request begins to be processed
func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
p.inProgressMetric.Inc()
-
- if acquireTime > acquireDurationLogThreshold {
- logger := ctxlogrus.Extract(ctx)
- logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait")
- }
-
p.acquiringSecondsMetric.Observe(acquireTime.Seconds())
if stats := limitStatsFromContext(ctx); stats != nil {
@@ -100,7 +95,12 @@ func (p *PromMonitor) Exit(ctx context.Context) {
}
// Dropped is called when a request is dropped.
-func (p *PromMonitor) Dropped(ctx context.Context, reason string) {
+func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, reason string) {
+ if stats := limitStatsFromContext(ctx); stats != nil {
+ stats.SetLimitingKey(key)
+ stats.SetConcurrencyQueueLength(length)
+ stats.SetConcurrencyDroppedReason(reason)
+ }
p.requestsDroppedMetric.WithLabelValues(reason).Inc()
}
diff --git a/internal/middleware/limithandler/monitor_test.go b/internal/middleware/limithandler/monitor_test.go
index ca555da0f..2d9bc6c16 100644
--- a/internal/middleware/limithandler/monitor_test.go
+++ b/internal/middleware/limithandler/monitor_test.go
@@ -7,6 +7,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
promconfig "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/prometheus"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
@@ -59,11 +60,11 @@ func TestNewPerRPCPromMonitor(t *testing.T) {
requestsDroppedMetric,
)
- ctx := testhelper.Context(t)
+ ctx := InitLimitStats(testhelper.Context(t))
- rpcMonitor.Queued(ctx)
+ rpcMonitor.Queued(ctx, fullMethod, 5)
rpcMonitor.Enter(ctx, time.Second)
- rpcMonitor.Dropped(ctx, "load")
+ rpcMonitor.Dropped(ctx, fullMethod, 5, "load")
expectedMetrics := `# HELP acquiring_seconds seconds to acquire
# TYPE acquiring_seconds histogram
@@ -100,19 +101,28 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
"dropped",
"acquiring_seconds",
))
+
+ stats := limitStatsFromContext(ctx)
+ require.NotNil(t, stats)
+ require.Equal(t, logrus.Fields{
+ "limit.limiting_key": fullMethod,
+ "limit.concurrency_queue_ms": int64(1000),
+ "limit.concurrency_queue_length": 5,
+ "limit.concurrency_dropped": "load",
+ }, stats.Fields())
}
func TestNewPackObjectsConcurrencyMonitor(t *testing.T) {
- ctx := testhelper.Context(t)
+ ctx := InitLimitStats(testhelper.Context(t))
m := NewPackObjectsConcurrencyMonitor(
"user",
promconfig.DefaultConfig().GRPCLatencyBuckets,
)
- m.Queued(ctx)
+ m.Queued(ctx, "1234", 5)
m.Enter(ctx, time.Second)
- m.Dropped(ctx, "load")
+ m.Dropped(ctx, "1234", 5, "load")
expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds)
# TYPE gitaly_pack_objects_acquiring_seconds histogram
@@ -146,4 +156,13 @@ gitaly_pack_objects_queued{type="user"} 1
"gitaly_pack_objects_queued",
"gitaly_pack_objects_dropped_total",
))
+
+ stats := limitStatsFromContext(ctx)
+ require.NotNil(t, stats)
+ require.Equal(t, logrus.Fields{
+ "limit.limiting_key": "1234",
+ "limit.concurrency_queue_ms": int64(1000),
+ "limit.concurrency_queue_length": 5,
+ "limit.concurrency_dropped": "load",
+ }, stats.Fields())
}
diff --git a/internal/middleware/limithandler/stats.go b/internal/middleware/limithandler/stats.go
index 5f77766fe..07985aeef 100644
--- a/internal/middleware/limithandler/stats.go
+++ b/internal/middleware/limithandler/stats.go
@@ -2,7 +2,7 @@ package limithandler
import (
"context"
- "sync/atomic"
+ "sync"
grpcmw "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/sirupsen/logrus"
@@ -13,25 +13,68 @@ type limitStatsKey struct{}
// LimitStats contains info about the concurrency limiter.
type LimitStats struct {
+ sync.Mutex
+ // limitingKey is the key used for limiting accounting
+ limitingKey string
+ // concurrencyQueueLen is the combination of in-flight requests and in-queue requests. It tells
+ // how busy the queue of the same limiting key is
+ concurrencyQueueLength int
// concurrencyQueueMs milliseconds waiting in concurrency limit queue.
concurrencyQueueMs int64
+ // concurrencyDropped stores the dropping reason of a request
+ concurrencyDropped string
}
-// InitLimitStats initializes context with a per-RPC stats struct
+// InitLimitStats initializes context with a per-RPC stats struct.
func InitLimitStats(ctx context.Context) context.Context {
return context.WithValue(ctx, limitStatsKey{}, &LimitStats{})
}
// AddConcurrencyQueueMs adds queue time.
func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64) {
- atomic.AddInt64(&s.concurrencyQueueMs, queueMs)
+ s.Lock()
+ defer s.Unlock()
+ s.concurrencyQueueMs = queueMs
+}
+
+// SetLimitingKey set limiting key.
+func (s *LimitStats) SetLimitingKey(limitingKey string) {
+ s.Lock()
+ defer s.Unlock()
+ s.limitingKey = limitingKey
+}
+
+// SetConcurrencyQueueLength set concurrency queue length.
+func (s *LimitStats) SetConcurrencyQueueLength(queueLength int) {
+ s.Lock()
+ defer s.Unlock()
+ s.concurrencyQueueLength = queueLength
+}
+
+// SetConcurrencyDroppedReason sets the reason why a call has been dropped from the queue.
+func (s *LimitStats) SetConcurrencyDroppedReason(reason string) {
+ s.Lock()
+ defer s.Unlock()
+ s.concurrencyDropped = reason
}
// Fields returns logging info.
func (s *LimitStats) Fields() logrus.Fields {
- return logrus.Fields{
- "limit.concurrency_queue_ms": s.concurrencyQueueMs,
+ s.Lock()
+ defer s.Unlock()
+
+ if s.limitingKey == "" {
+ return nil
+ }
+ logs := logrus.Fields{
+ "limit.limiting_key": s.limitingKey,
+ "limit.concurrency_queue_ms": s.concurrencyQueueMs,
+ "limit.concurrency_queue_length": s.concurrencyQueueLength,
+ }
+ if s.concurrencyDropped != "" {
+ logs["limit.concurrency_dropped"] = s.concurrencyDropped
}
+ return logs
}
// FieldsProducer extracts stats info from the context and returns it as a logging fields.
@@ -53,9 +96,9 @@ func limitStatsFromContext(ctx context.Context) *LimitStats {
// stats interceptors are separate from middleware and serve one main purpose:
// initialize the stats object early, so that logrus can see it.
-// it must be placed before the limithandler middleware
+// it must be placed before the limithandler middleware.
-// StatsUnaryInterceptor returns a Unary Interceptor that initializes the context
+// StatsUnaryInterceptor returns a Unary Interceptor that initializes the context.
func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx = InitLimitStats(ctx)
@@ -64,7 +107,7 @@ func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.Unar
return res, err
}
-// StatsStreamInterceptor returns a Stream Interceptor
+// StatsStreamInterceptor returns a Stream Interceptor.
func StatsStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := stream.Context()
ctx = InitLimitStats(ctx)
diff --git a/internal/middleware/limithandler/stats_interceptor_test.go b/internal/middleware/limithandler/stats_interceptor_test.go
index 5dc641100..916bae1fc 100644
--- a/internal/middleware/limithandler/stats_interceptor_test.go
+++ b/internal/middleware/limithandler/stats_interceptor_test.go
@@ -33,7 +33,7 @@ func createNewServer(t *testing.T, cfg config.Cfg, logger *logrus.Logger) *grpc.
concurrencyLimitHandler := limithandler.New(
cfg,
- limithandler.LimitConcurrencyByRepo,
+ func(ctx context.Context) string { return "@hashed/1234" },
limithandler.WithConcurrencyLimiters,
)
@@ -80,7 +80,18 @@ func TestInterceptor(t *testing.T) {
t.Parallel()
ctx := testhelper.Context(t)
- cfg := testcfg.Build(t)
+ cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{
+ Concurrency: []config.Concurrency{
+ {
+ RPC: "/gitaly.RefService/RefExists",
+ MaxPerRepo: 1,
+ },
+ {
+ RPC: "/gitaly.RefService/ListRefs",
+ MaxPerRepo: 1,
+ },
+ },
+ }))
repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
SkipCreationViaService: true,
@@ -98,7 +109,7 @@ func TestInterceptor(t *testing.T) {
tests := []struct {
name string
performRPC func(t *testing.T, ctx context.Context, client gitalypb.RefServiceClient)
- expectedLogData []string
+ expectedLogData map[string]any
}{
{
name: "Unary",
@@ -108,7 +119,10 @@ func TestInterceptor(t *testing.T) {
_, err := client.RefExists(ctx, req)
require.NoError(t, err)
},
- expectedLogData: []string{"limit.concurrency_queue_ms"},
+ expectedLogData: map[string]any{
+ "limit.limiting_key": "@hashed/1234",
+ "limit.concurrency_queue_length": 0,
+ },
},
{
name: "Stream",
@@ -126,7 +140,10 @@ func TestInterceptor(t *testing.T) {
require.NoError(t, err)
}
},
- expectedLogData: []string{"limit.concurrency_queue_ms"},
+ expectedLogData: map[string]any{
+ "limit.limiting_key": "@hashed/1234",
+ "limit.concurrency_queue_length": 0,
+ },
},
}
for _, tt := range tests {
@@ -143,9 +160,10 @@ func TestInterceptor(t *testing.T) {
logEntries := hook.AllEntries()
require.Len(t, logEntries, 1)
- for _, expectedLogKey := range tt.expectedLogData {
- require.Contains(t, logEntries[0].Data, expectedLogKey)
+ for expectedLogKey, expectedLogValue := range tt.expectedLogData {
+ require.Equal(t, expectedLogValue, logEntries[0].Data[expectedLogKey])
}
+ require.GreaterOrEqual(t, logEntries[0].Data["limit.concurrency_queue_ms"], int64(0))
})
}
}
diff --git a/internal/middleware/limithandler/stats_test.go b/internal/middleware/limithandler/stats_test.go
index 1118164ac..66e07ce08 100644
--- a/internal/middleware/limithandler/stats_test.go
+++ b/internal/middleware/limithandler/stats_test.go
@@ -15,7 +15,15 @@ func TestLimitStats(t *testing.T) {
ctx = InitLimitStats(ctx)
stats := limitStatsFromContext(ctx)
+ stats.SetLimitingKey("hello-world")
stats.AddConcurrencyQueueMs(13)
+ stats.SetConcurrencyQueueLength(99)
+ stats.SetConcurrencyDroppedReason("max_time")
- assert.Equal(t, FieldsProducer(ctx, nil), logrus.Fields{"limit.concurrency_queue_ms": int64(13)})
+ assert.Equal(t, FieldsProducer(ctx, nil), logrus.Fields{
+ "limit.limiting_key": "hello-world",
+ "limit.concurrency_queue_ms": int64(13),
+ "limit.concurrency_queue_length": 99,
+ "limit.concurrency_dropped": "max_time",
+ })
}