diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 15 | ||||
-rw-r--r-- | internal/middleware/limithandler/metrics.go | 62 | ||||
-rw-r--r-- | internal/middleware/limithandler/monitor.go | 82 |
3 files changed, 82 insertions, 77 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index ab3a1cf3e..7088c7077 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -10,14 +10,6 @@ import ( // LimitedFunc represents a function that will be limited type LimitedFunc func() (resp interface{}, err error) -// ConcurrencyMonitor allows the concurrency monitor to be observed -type ConcurrencyMonitor interface { - Queued(ctx context.Context) - Dequeued(ctx context.Context) - Enter(ctx context.Context, acquireTime time.Duration) - Exit(ctx context.Context) -} - // ConcurrencyLimiter contains rate limiter state type ConcurrencyLimiter struct { semaphores map[string]*semaphoreReference @@ -119,10 +111,3 @@ func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter { monitor: monitor, } } - -type nullConcurrencyMonitor struct{} - -func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {} -func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {} -func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {} -func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {} diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go index 307d00944..77fd77137 100644 --- a/internal/middleware/limithandler/metrics.go +++ b/internal/middleware/limithandler/metrics.go @@ -1,17 +1,10 @@ package limithandler import ( - "context" - "strings" - "time" - - "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) -const acquireDurationLogThreshold = 10 * time.Millisecond - var ( histogramVec *prometheus.HistogramVec inprogressGaugeVec = promauto.NewGaugeVec( @@ -35,20 +28,6 @@ var ( ) ) -type promMonitor struct { - queuedGauge prometheus.Gauge - inprogressGauge prometheus.Gauge - histogram prometheus.Observer -} - -func splitMethodName(fullMethodName string) (string, string) { - fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash - if i := strings.Index(fullMethodName, "/"); i >= 0 { - return fullMethodName[:i], fullMethodName[i+1:] - } - return "unknown", "unknown" -} - // EnableAcquireTimeHistogram enables histograms for acquisition times func EnableAcquireTimeHistogram(buckets []float64) { histogramOpts := prometheus.HistogramOpts{ @@ -64,44 +43,3 @@ func EnableAcquireTimeHistogram(buckets []float64) { []string{"system", "grpc_service", "grpc_method"}, ) } - -func (c *promMonitor) Queued(ctx context.Context) { - c.queuedGauge.Inc() -} - -func (c *promMonitor) Dequeued(ctx context.Context) { - c.queuedGauge.Dec() -} - -func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) { - c.inprogressGauge.Inc() - - if acquireTime > acquireDurationLogThreshold { - logger := ctxlogrus.Extract(ctx) - logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait") - } - - if c.histogram != nil { - c.histogram.Observe(acquireTime.Seconds()) - } -} - -func (c *promMonitor) Exit(ctx context.Context) { - c.inprogressGauge.Dec() -} - -// NewPromMonitor creates a new ConcurrencyMonitor that tracks limiter -// activity in Prometheus. -func NewPromMonitor(system string, fullMethod string) ConcurrencyMonitor { - serviceName, methodName := splitMethodName(fullMethod) - - queuedGauge := queuedGaugeVec.WithLabelValues(system, serviceName, methodName) - inprogressGauge := inprogressGaugeVec.WithLabelValues(system, serviceName, methodName) - - var histogram prometheus.Observer - if histogramVec != nil { - histogram = histogramVec.WithLabelValues(system, serviceName, methodName) - } - - return &promMonitor{queuedGauge, inprogressGauge, histogram} -} diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go new file mode 100644 index 000000000..8079eab37 --- /dev/null +++ b/internal/middleware/limithandler/monitor.go @@ -0,0 +1,82 @@ +package limithandler + +import ( + "context" + "strings" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/prometheus/client_golang/prometheus" +) + +const acquireDurationLogThreshold = 10 * time.Millisecond + +// ConcurrencyMonitor allows the concurrency monitor to be observed +type ConcurrencyMonitor interface { + Queued(ctx context.Context) + Dequeued(ctx context.Context) + Enter(ctx context.Context, acquireTime time.Duration) + Exit(ctx context.Context) +} + +type nullConcurrencyMonitor struct{} + +func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {} +func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {} +func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {} +func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {} + +type promMonitor struct { + queuedGauge prometheus.Gauge + inprogressGauge prometheus.Gauge + histogram prometheus.Observer +} + +// NewPromMonitor creates a new ConcurrencyMonitor that tracks limiter +// activity in Prometheus. +func NewPromMonitor(system string, fullMethod string) ConcurrencyMonitor { + serviceName, methodName := splitMethodName(fullMethod) + + queuedGauge := queuedGaugeVec.WithLabelValues(system, serviceName, methodName) + inprogressGauge := inprogressGaugeVec.WithLabelValues(system, serviceName, methodName) + + var histogram prometheus.Observer + if histogramVec != nil { + histogram = histogramVec.WithLabelValues(system, serviceName, methodName) + } + + return &promMonitor{queuedGauge, inprogressGauge, histogram} +} + +func (c *promMonitor) Queued(ctx context.Context) { + c.queuedGauge.Inc() +} + +func (c *promMonitor) Dequeued(ctx context.Context) { + c.queuedGauge.Dec() +} + +func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) { + c.inprogressGauge.Inc() + + if acquireTime > acquireDurationLogThreshold { + logger := ctxlogrus.Extract(ctx) + logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait") + } + + if c.histogram != nil { + c.histogram.Observe(acquireTime.Seconds()) + } +} + +func (c *promMonitor) Exit(ctx context.Context) { + c.inprogressGauge.Dec() +} + +func splitMethodName(fullMethodName string) (string, string) { + fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash + if i := strings.Index(fullMethodName, "/"); i >= 0 { + return fullMethodName[:i], fullMethodName[i+1:] + } + return "unknown", "unknown" +} |