diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-08-28 06:16:08 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-08-28 06:16:08 +0300 |
commit | 704f12ccd332c086dc06c5bd4658e7fe7bc6a70c (patch) | |
tree | 2ddde396dd79ca1a2a1c9105b96ba3e3e1894a93 | |
parent | e0f5aac979b770ab9dea403f7cb93450bd9e1b70 (diff) | |
parent | 59ba8412038c130501145d9831e5bd39ef25f6c8 (diff) |
Merge branch 'qmnguyen0711/add-in-progress-log-field' into 'master'
Add concurrency in_progress log field and fix queue_length log's semantic
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6226
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: John Cai <jcai@gitlab.com>
Reviewed-by: John Cai <jcai@gitlab.com>
Reviewed-by: karthik nayak <knayak@gitlab.com>
-rw-r--r-- | internal/limiter/concurrency_limiter.go | 20 | ||||
-rw-r--r-- | internal/limiter/concurrency_limiter_test.go | 4 | ||||
-rw-r--r-- | internal/limiter/monitor.go | 22 | ||||
-rw-r--r-- | internal/limiter/monitor_test.go | 18 |
4 files changed, 40 insertions, 24 deletions
diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go index 2807b856e..4d021582b 100644 --- a/internal/limiter/concurrency_limiter.go +++ b/internal/limiter/concurrency_limiter.go @@ -115,9 +115,17 @@ func (sem *keyedConcurrencyLimiter) release() { <-sem.concurrencyTokens } -// queueLength returns the length of token queue +// queueLength returns the length of the queue waiting for tokens. func (sem *keyedConcurrencyLimiter) queueLength() int { - return len(sem.queueTokens) + if sem.queueTokens == nil { + return 0 + } + return len(sem.queueTokens) - len(sem.concurrencyTokens) +} + +// inProgress returns the number of in-progress tokens. +func (sem *keyedConcurrencyLimiter) inProgress() int { + return len(sem.concurrencyTokens) } // ConcurrencyLimiter contains rate limiter state. @@ -189,25 +197,25 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li queueTime := time.Since(start) switch err { case ErrMaxQueueSize: - c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "max_size") + c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), sem.inProgress(), queueTime, "max_size") return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{ ErrorMessage: err.Error(), RetryAfter: durationpb.New(0), }) case ErrMaxQueueTime: - c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "max_time") + c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), sem.inProgress(), queueTime, "max_time") return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{ ErrorMessage: err.Error(), RetryAfter: durationpb.New(0), }) default: - c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "other") + c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), sem.inProgress(), queueTime, "other") return nil, fmt.Errorf("unexpected error when dequeueing request: %w", err) } } defer sem.release() - c.monitor.Enter(ctx, time.Since(start)) + c.monitor.Enter(ctx, sem.inProgress(), time.Since(start)) defer c.monitor.Exit(ctx) return f() } diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go index f99ca7eab..8b4851f2c 100644 --- a/internal/limiter/concurrency_limiter_test.go +++ b/internal/limiter/concurrency_limiter_test.go @@ -65,7 +65,7 @@ func (c *counter) Dequeued(context.Context) { c.dequeued++ } -func (c *counter) Enter(context.Context, time.Duration) { +func (c *counter) Enter(context.Context, int, time.Duration) { c.Lock() defer c.Unlock() c.enter++ @@ -77,7 +77,7 @@ func (c *counter) Exit(context.Context) { c.exit++ } -func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, reason string) { +func (c *counter) Dropped(_ context.Context, _ string, _ int, _ int, _ time.Duration, reason string) { switch reason { case "max_time": c.droppedTime++ diff --git a/internal/limiter/monitor.go b/internal/limiter/monitor.go index b1e074d41..9230525d8 100644 --- a/internal/limiter/monitor.go +++ b/internal/limiter/monitor.go @@ -13,18 +13,18 @@ import ( type ConcurrencyMonitor interface { Queued(ctx context.Context, key string, length int) Dequeued(ctx context.Context) - Enter(ctx context.Context, acquireTime time.Duration) + Enter(ctx context.Context, inProgress int, acquireTime time.Duration) Exit(ctx context.Context) - Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, message string) + Dropped(ctx context.Context, key string, queueLength int, inProgress int, acquireTime time.Duration, message string) } type noopConcurrencyMonitor struct{} -func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {} -func (c *noopConcurrencyMonitor) Dequeued(context.Context) {} -func (c *noopConcurrencyMonitor) Enter(context.Context, time.Duration) {} -func (c *noopConcurrencyMonitor) Exit(context.Context) {} -func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, time.Duration, string) {} +func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {} +func (c *noopConcurrencyMonitor) Dequeued(context.Context) {} +func (c *noopConcurrencyMonitor) Enter(context.Context, int, time.Duration) {} +func (c *noopConcurrencyMonitor) Exit(context.Context) {} +func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, int, time.Duration, string) {} // NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor func NewNoopConcurrencyMonitor() ConcurrencyMonitor { @@ -86,12 +86,13 @@ func (p *PromMonitor) Dequeued(ctx context.Context) { } // Enter is called when a request begins to be processed. -func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) { +func (p *PromMonitor) Enter(ctx context.Context, inProgress int, acquireTime time.Duration) { p.inProgressMetric.Inc() p.acquiringSecondsMetric.Observe(acquireTime.Seconds()) if stats := log.CustomFieldsFromContext(ctx); stats != nil { stats.RecordMetadata("limit.concurrency_queue_ms", acquireTime.Milliseconds()) + stats.RecordMetadata("limit.concurrency_in_progress", inProgress) } } @@ -101,11 +102,12 @@ func (p *PromMonitor) Exit(ctx context.Context) { } // Dropped is called when a request is dropped. -func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, reason string) { +func (p *PromMonitor) Dropped(ctx context.Context, key string, queueLength int, inProgress int, acquireTime time.Duration, reason string) { if stats := log.CustomFieldsFromContext(ctx); stats != nil { stats.RecordMetadata("limit.limiting_type", p.limitingType) stats.RecordMetadata("limit.limiting_key", key) - stats.RecordMetadata("limit.concurrency_queue_length", length) + stats.RecordMetadata("limit.concurrency_queue_length", queueLength) + stats.RecordMetadata("limit.concurrency_in_progress", inProgress) stats.RecordMetadata("limit.concurrency_dropped", reason) stats.RecordMetadata("limit.concurrency_queue_ms", acquireTime.Milliseconds()) } diff --git a/internal/limiter/monitor_test.go b/internal/limiter/monitor_test.go index 59fbbe7c8..44b98cec1 100644 --- a/internal/limiter/monitor_test.go +++ b/internal/limiter/monitor_test.go @@ -67,7 +67,7 @@ func TestNewPerRPCPromMonitor(t *testing.T) { ctx := log.InitContextCustomFields(testhelper.Context(t)) rpcMonitor.Queued(ctx, fullMethod, 5) - rpcMonitor.Enter(ctx, time.Second) + rpcMonitor.Enter(ctx, 10, time.Second) expectedMetrics := `# HELP acquiring_seconds seconds to acquire # TYPE acquiring_seconds histogram @@ -109,6 +109,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 "limit.limiting_key": fullMethod, "limit.concurrency_queue_ms": int64(1000), "limit.concurrency_queue_length": 5, + "limit.concurrency_in_progress": 10, }, stats.Fields()) // After the request exists, in_progress and queued gauge decrease @@ -136,7 +137,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 ctx := log.InitContextCustomFields(testhelper.Context(t)) rpcMonitor.Queued(ctx, fullMethod, 5) - rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load") + rpcMonitor.Dropped(ctx, fullMethod, 5, 10, time.Second, "load") expectedMetrics := `# HELP acquiring_seconds seconds to acquire # TYPE acquiring_seconds histogram @@ -181,6 +182,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 "limit.limiting_key": fullMethod, "limit.concurrency_queue_ms": int64(1000), "limit.concurrency_queue_length": 5, + "limit.concurrency_in_progress": 10, "limit.concurrency_dropped": "load", }, stats.Fields()) }) @@ -188,7 +190,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 t.Run("request is dropped before queueing", func(t *testing.T) { rpcMonitor := createNewMonitor() ctx := log.InitContextCustomFields(testhelper.Context(t)) - rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load") + rpcMonitor.Dropped(ctx, fullMethod, 5, 10, time.Second, "load") expectedMetrics := `# HELP acquiring_seconds seconds to acquire # TYPE acquiring_seconds histogram @@ -233,6 +235,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 "limit.limiting_key": fullMethod, "limit.concurrency_queue_ms": int64(1000), "limit.concurrency_queue_length": 5, + "limit.concurrency_in_progress": 10, "limit.concurrency_dropped": "load", }, stats.Fields()) }) @@ -246,7 +249,7 @@ func TestNewPackObjectsConcurrencyMonitor(t *testing.T) { ) packObjectsConcurrencyMonitor.Queued(ctx, "1234", 5) - packObjectsConcurrencyMonitor.Enter(ctx, time.Second) + packObjectsConcurrencyMonitor.Enter(ctx, 10, time.Second) expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds) # TYPE gitaly_pack_objects_acquiring_seconds histogram @@ -288,6 +291,7 @@ gitaly_pack_objects_queued 1 "limit.limiting_key": "1234", "limit.concurrency_queue_ms": int64(1000), "limit.concurrency_queue_length": 5, + "limit.concurrency_in_progress": 10, }, stats.Fields()) // After the request exists, in_progress and queued gauge decrease @@ -317,7 +321,7 @@ gitaly_pack_objects_queued 0 ) packObjectsConcurrencyMonitor.Queued(ctx, "1234", 5) - packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, time.Second, "load") + packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, 10, time.Second, "load") expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds) # TYPE gitaly_pack_objects_acquiring_seconds histogram @@ -362,6 +366,7 @@ gitaly_pack_objects_queued 1 "limit.limiting_key": "1234", "limit.concurrency_queue_ms": int64(1000), "limit.concurrency_queue_length": 5, + "limit.concurrency_in_progress": 10, "limit.concurrency_dropped": "load", }, stats.Fields()) }) @@ -372,7 +377,7 @@ gitaly_pack_objects_queued 1 promconfig.DefaultConfig().GRPCLatencyBuckets, ) - packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, time.Second, "load") + packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, 10, time.Second, "load") expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds) # TYPE gitaly_pack_objects_acquiring_seconds histogram @@ -417,6 +422,7 @@ gitaly_pack_objects_queued 0 "limit.limiting_key": "1234", "limit.concurrency_queue_ms": int64(1000), "limit.concurrency_queue_length": 5, + "limit.concurrency_in_progress": 10, "limit.concurrency_dropped": "load", }, stats.Fields()) }) |