Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-08-28 06:16:08 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-08-28 06:16:08 +0300
commit704f12ccd332c086dc06c5bd4658e7fe7bc6a70c (patch)
tree2ddde396dd79ca1a2a1c9105b96ba3e3e1894a93
parente0f5aac979b770ab9dea403f7cb93450bd9e1b70 (diff)
parent59ba8412038c130501145d9831e5bd39ef25f6c8 (diff)
Merge branch 'qmnguyen0711/add-in-progress-log-field' into 'master'
Add concurrency in_progress log field and fix queue_length log's semantic See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6226 Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: John Cai <jcai@gitlab.com> Reviewed-by: John Cai <jcai@gitlab.com> Reviewed-by: karthik nayak <knayak@gitlab.com>
-rw-r--r--internal/limiter/concurrency_limiter.go20
-rw-r--r--internal/limiter/concurrency_limiter_test.go4
-rw-r--r--internal/limiter/monitor.go22
-rw-r--r--internal/limiter/monitor_test.go18
4 files changed, 40 insertions, 24 deletions
diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go
index 2807b856e..4d021582b 100644
--- a/internal/limiter/concurrency_limiter.go
+++ b/internal/limiter/concurrency_limiter.go
@@ -115,9 +115,17 @@ func (sem *keyedConcurrencyLimiter) release() {
<-sem.concurrencyTokens
}
-// queueLength returns the length of token queue
+// queueLength returns the length of the queue waiting for tokens.
func (sem *keyedConcurrencyLimiter) queueLength() int {
- return len(sem.queueTokens)
+ if sem.queueTokens == nil {
+ return 0
+ }
+ return len(sem.queueTokens) - len(sem.concurrencyTokens)
+}
+
+// inProgress returns the number of in-progress tokens.
+func (sem *keyedConcurrencyLimiter) inProgress() int {
+ return len(sem.concurrencyTokens)
}
// ConcurrencyLimiter contains rate limiter state.
@@ -189,25 +197,25 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
queueTime := time.Since(start)
switch err {
case ErrMaxQueueSize:
- c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "max_size")
+ c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), sem.inProgress(), queueTime, "max_size")
return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{
ErrorMessage: err.Error(),
RetryAfter: durationpb.New(0),
})
case ErrMaxQueueTime:
- c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "max_time")
+ c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), sem.inProgress(), queueTime, "max_time")
return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{
ErrorMessage: err.Error(),
RetryAfter: durationpb.New(0),
})
default:
- c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), queueTime, "other")
+ c.monitor.Dropped(ctx, limitingKey, sem.queueLength(), sem.inProgress(), queueTime, "other")
return nil, fmt.Errorf("unexpected error when dequeueing request: %w", err)
}
}
defer sem.release()
- c.monitor.Enter(ctx, time.Since(start))
+ c.monitor.Enter(ctx, sem.inProgress(), time.Since(start))
defer c.monitor.Exit(ctx)
return f()
}
diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go
index f99ca7eab..8b4851f2c 100644
--- a/internal/limiter/concurrency_limiter_test.go
+++ b/internal/limiter/concurrency_limiter_test.go
@@ -65,7 +65,7 @@ func (c *counter) Dequeued(context.Context) {
c.dequeued++
}
-func (c *counter) Enter(context.Context, time.Duration) {
+func (c *counter) Enter(context.Context, int, time.Duration) {
c.Lock()
defer c.Unlock()
c.enter++
@@ -77,7 +77,7 @@ func (c *counter) Exit(context.Context) {
c.exit++
}
-func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, reason string) {
+func (c *counter) Dropped(_ context.Context, _ string, _ int, _ int, _ time.Duration, reason string) {
switch reason {
case "max_time":
c.droppedTime++
diff --git a/internal/limiter/monitor.go b/internal/limiter/monitor.go
index b1e074d41..9230525d8 100644
--- a/internal/limiter/monitor.go
+++ b/internal/limiter/monitor.go
@@ -13,18 +13,18 @@ import (
type ConcurrencyMonitor interface {
Queued(ctx context.Context, key string, length int)
Dequeued(ctx context.Context)
- Enter(ctx context.Context, acquireTime time.Duration)
+ Enter(ctx context.Context, inProgress int, acquireTime time.Duration)
Exit(ctx context.Context)
- Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, message string)
+ Dropped(ctx context.Context, key string, queueLength int, inProgress int, acquireTime time.Duration, message string)
}
type noopConcurrencyMonitor struct{}
-func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {}
-func (c *noopConcurrencyMonitor) Dequeued(context.Context) {}
-func (c *noopConcurrencyMonitor) Enter(context.Context, time.Duration) {}
-func (c *noopConcurrencyMonitor) Exit(context.Context) {}
-func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, time.Duration, string) {}
+func (c *noopConcurrencyMonitor) Queued(context.Context, string, int) {}
+func (c *noopConcurrencyMonitor) Dequeued(context.Context) {}
+func (c *noopConcurrencyMonitor) Enter(context.Context, int, time.Duration) {}
+func (c *noopConcurrencyMonitor) Exit(context.Context) {}
+func (c *noopConcurrencyMonitor) Dropped(context.Context, string, int, int, time.Duration, string) {}
// NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor
func NewNoopConcurrencyMonitor() ConcurrencyMonitor {
@@ -86,12 +86,13 @@ func (p *PromMonitor) Dequeued(ctx context.Context) {
}
// Enter is called when a request begins to be processed.
-func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
+func (p *PromMonitor) Enter(ctx context.Context, inProgress int, acquireTime time.Duration) {
p.inProgressMetric.Inc()
p.acquiringSecondsMetric.Observe(acquireTime.Seconds())
if stats := log.CustomFieldsFromContext(ctx); stats != nil {
stats.RecordMetadata("limit.concurrency_queue_ms", acquireTime.Milliseconds())
+ stats.RecordMetadata("limit.concurrency_in_progress", inProgress)
}
}
@@ -101,11 +102,12 @@ func (p *PromMonitor) Exit(ctx context.Context) {
}
// Dropped is called when a request is dropped.
-func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, reason string) {
+func (p *PromMonitor) Dropped(ctx context.Context, key string, queueLength int, inProgress int, acquireTime time.Duration, reason string) {
if stats := log.CustomFieldsFromContext(ctx); stats != nil {
stats.RecordMetadata("limit.limiting_type", p.limitingType)
stats.RecordMetadata("limit.limiting_key", key)
- stats.RecordMetadata("limit.concurrency_queue_length", length)
+ stats.RecordMetadata("limit.concurrency_queue_length", queueLength)
+ stats.RecordMetadata("limit.concurrency_in_progress", inProgress)
stats.RecordMetadata("limit.concurrency_dropped", reason)
stats.RecordMetadata("limit.concurrency_queue_ms", acquireTime.Milliseconds())
}
diff --git a/internal/limiter/monitor_test.go b/internal/limiter/monitor_test.go
index 59fbbe7c8..44b98cec1 100644
--- a/internal/limiter/monitor_test.go
+++ b/internal/limiter/monitor_test.go
@@ -67,7 +67,7 @@ func TestNewPerRPCPromMonitor(t *testing.T) {
ctx := log.InitContextCustomFields(testhelper.Context(t))
rpcMonitor.Queued(ctx, fullMethod, 5)
- rpcMonitor.Enter(ctx, time.Second)
+ rpcMonitor.Enter(ctx, 10, time.Second)
expectedMetrics := `# HELP acquiring_seconds seconds to acquire
# TYPE acquiring_seconds histogram
@@ -109,6 +109,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
"limit.limiting_key": fullMethod,
"limit.concurrency_queue_ms": int64(1000),
"limit.concurrency_queue_length": 5,
+ "limit.concurrency_in_progress": 10,
}, stats.Fields())
// After the request exists, in_progress and queued gauge decrease
@@ -136,7 +137,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
ctx := log.InitContextCustomFields(testhelper.Context(t))
rpcMonitor.Queued(ctx, fullMethod, 5)
- rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load")
+ rpcMonitor.Dropped(ctx, fullMethod, 5, 10, time.Second, "load")
expectedMetrics := `# HELP acquiring_seconds seconds to acquire
# TYPE acquiring_seconds histogram
@@ -181,6 +182,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
"limit.limiting_key": fullMethod,
"limit.concurrency_queue_ms": int64(1000),
"limit.concurrency_queue_length": 5,
+ "limit.concurrency_in_progress": 10,
"limit.concurrency_dropped": "load",
}, stats.Fields())
})
@@ -188,7 +190,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
t.Run("request is dropped before queueing", func(t *testing.T) {
rpcMonitor := createNewMonitor()
ctx := log.InitContextCustomFields(testhelper.Context(t))
- rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load")
+ rpcMonitor.Dropped(ctx, fullMethod, 5, 10, time.Second, "load")
expectedMetrics := `# HELP acquiring_seconds seconds to acquire
# TYPE acquiring_seconds histogram
@@ -233,6 +235,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
"limit.limiting_key": fullMethod,
"limit.concurrency_queue_ms": int64(1000),
"limit.concurrency_queue_length": 5,
+ "limit.concurrency_in_progress": 10,
"limit.concurrency_dropped": "load",
}, stats.Fields())
})
@@ -246,7 +249,7 @@ func TestNewPackObjectsConcurrencyMonitor(t *testing.T) {
)
packObjectsConcurrencyMonitor.Queued(ctx, "1234", 5)
- packObjectsConcurrencyMonitor.Enter(ctx, time.Second)
+ packObjectsConcurrencyMonitor.Enter(ctx, 10, time.Second)
expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds)
# TYPE gitaly_pack_objects_acquiring_seconds histogram
@@ -288,6 +291,7 @@ gitaly_pack_objects_queued 1
"limit.limiting_key": "1234",
"limit.concurrency_queue_ms": int64(1000),
"limit.concurrency_queue_length": 5,
+ "limit.concurrency_in_progress": 10,
}, stats.Fields())
// After the request exists, in_progress and queued gauge decrease
@@ -317,7 +321,7 @@ gitaly_pack_objects_queued 0
)
packObjectsConcurrencyMonitor.Queued(ctx, "1234", 5)
- packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, time.Second, "load")
+ packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, 10, time.Second, "load")
expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds)
# TYPE gitaly_pack_objects_acquiring_seconds histogram
@@ -362,6 +366,7 @@ gitaly_pack_objects_queued 1
"limit.limiting_key": "1234",
"limit.concurrency_queue_ms": int64(1000),
"limit.concurrency_queue_length": 5,
+ "limit.concurrency_in_progress": 10,
"limit.concurrency_dropped": "load",
}, stats.Fields())
})
@@ -372,7 +377,7 @@ gitaly_pack_objects_queued 1
promconfig.DefaultConfig().GRPCLatencyBuckets,
)
- packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, time.Second, "load")
+ packObjectsConcurrencyMonitor.Dropped(ctx, "1234", 5, 10, time.Second, "load")
expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds)
# TYPE gitaly_pack_objects_acquiring_seconds histogram
@@ -417,6 +422,7 @@ gitaly_pack_objects_queued 0
"limit.limiting_key": "1234",
"limit.concurrency_queue_ms": int64(1000),
"limit.concurrency_queue_length": 5,
+ "limit.concurrency_in_progress": 10,
"limit.concurrency_dropped": "load",
}, stats.Fields())
})