diff options
author | John Cai <jcai@gitlab.com> | 2021-12-16 21:38:14 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2021-12-16 21:38:14 +0300 |
commit | 61471fff6a262bc2c9c5270016f8a677e48638bb (patch) | |
tree | 2ce78c49318abb1916885bde6f883c5c66e64ce7 /internal | |
parent | 1762b3a49fef1f12c2dc6ac153807b2c8c25b1fe (diff) | |
parent | 2766b879b037ad07e4313e76debbede691b40000 (diff) |
Merge branch 'pks-limithandler-remove-globals' into 'master'
limithandler: Refactor package to not use global state
See merge request gitlab-org/gitaly!4197
Diffstat (limited to 'internal')
-rw-r--r-- | internal/gitaly/config/concurrency.go | 22 | ||||
-rw-r--r-- | internal/gitaly/config/prometheus/config.go | 3 | ||||
-rw-r--r-- | internal/gitaly/server/auth_test.go | 15 | ||||
-rw-r--r-- | internal/gitaly/server/server.go | 23 | ||||
-rw-r--r-- | internal/gitaly/server/server_factory.go | 8 | ||||
-rw-r--r-- | internal/gitaly/server/server_factory_test.go | 41 | ||||
-rw-r--r-- | internal/gitaly/service/dependencies.go | 7 | ||||
-rw-r--r-- | internal/gitaly/service/repository/create_fork_test.go | 4 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 15 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter_test.go | 5 | ||||
-rw-r--r-- | internal/middleware/limithandler/limithandler.go | 117 | ||||
-rw-r--r-- | internal/middleware/limithandler/metrics.go | 107 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware.go | 185 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware_test.go (renamed from internal/middleware/limithandler/limithandler_test.go) | 26 | ||||
-rw-r--r-- | internal/middleware/limithandler/monitor.go | 76 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 8 |
16 files changed, 360 insertions, 302 deletions
diff --git a/internal/gitaly/config/concurrency.go b/internal/gitaly/config/concurrency.go deleted file mode 100644 index 1c54a0250..000000000 --- a/internal/gitaly/config/concurrency.go +++ /dev/null @@ -1,22 +0,0 @@ -package config - -import ( - "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" -) - -// ConfigureConcurrencyLimits configures the per-repo, per RPC rate limits -func ConfigureConcurrencyLimits(cfg Cfg) { - maxConcurrencyPerRepoPerRPC := make(map[string]int) - - for _, v := range cfg.Concurrency { - maxConcurrencyPerRepoPerRPC[v.RPC] = v.MaxPerRepo - } - - // Set default for ReplicateRepository - replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" - if _, ok := maxConcurrencyPerRepoPerRPC[replicateRepositoryFullMethod]; !ok { - maxConcurrencyPerRepoPerRPC[replicateRepositoryFullMethod] = 1 - } - - limithandler.SetMaxRepoConcurrency(maxConcurrencyPerRepoPerRPC) -} diff --git a/internal/gitaly/config/prometheus/config.go b/internal/gitaly/config/prometheus/config.go index 8d3c91fc4..4873eec1d 100644 --- a/internal/gitaly/config/prometheus/config.go +++ b/internal/gitaly/config/prometheus/config.go @@ -6,7 +6,6 @@ import ( grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" ) // Config contains additional configuration data for prometheus @@ -40,6 +39,4 @@ func (c *Config) Configure() { grpcprometheus.EnableClientHandlingTimeHistogram(func(histogramOpts *prometheus.HistogramOpts) { histogramOpts.Buckets = c.GRPCLatencyBuckets }) - - limithandler.EnableAcquireTimeHistogram(c.GRPCLatencyBuckets) } diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 016527da5..44ffcf08c 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -26,6 +26,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -200,8 +201,9 @@ func runServer(t *testing.T, cfg config.Cfg) string { catfileCache := catfile.NewCache(cfg) t.Cleanup(catfileCache.Stop) diskCache := cache.New(cfg, locator) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo) - srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache) + srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler) require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ @@ -235,7 +237,14 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { conns := client.NewPool() t.Cleanup(func() { conns.Close() }) - srv, err := New(true, cfg, testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + srv, err := New( + true, + cfg, + testhelper.NewDiscardingLogEntry(t), + backchannel.NewRegistry(), + cache.New(cfg, config.NewLocator(cfg)), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), + ) require.NoError(t, err) healthpb.RegisterHealthServer(srv, health.NewServer()) @@ -307,8 +316,6 @@ func TestAuthBeforeLimit(t *testing.T) { }, )) - config.ConfigureConcurrencyLimits(cfg) - gitlabURL, cleanup := gitlab.SetupAndStartGitlabServer(t, cfg.GitlabShell.Dir, &gitlab.TestServerOptions{ SecretToken: "secretToken", GLID: gittest.GlID, diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index 9dcf07aa4..06a665dcf 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -1,7 +1,6 @@ package server import ( - "context" "crypto/tls" "fmt" "time" @@ -37,21 +36,6 @@ import ( "google.golang.org/grpc/keepalive" ) -func concurrencyKeyFn(ctx context.Context) string { - tags := grpcmwtags.Extract(ctx) - ctxValue := tags.Values()["grpc.request.repoPath"] - if ctxValue == nil { - return "" - } - - s, ok := ctxValue.(string) - if ok { - return s - } - - return "" -} - func init() { for _, l := range gitalylog.Loggers { urlSanitizer := logsanitizer.NewURLSanitizerHook() @@ -75,13 +59,12 @@ func New( logrusEntry *log.Entry, registry *backchannel.Registry, cacheInvalidator diskcache.Invalidator, + limitHandler *limithandler.LimiterMiddleware, ) (*grpc.Server, error) { ctxTagOpts := []grpcmwtags.Option{ grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), } - lh := limithandler.New(concurrencyKeyFn) - transportCredentials := insecure.NewCredentials() // If tls config is specified attempt to extract tls options and use it // as a grpc.ServerOption @@ -132,7 +115,7 @@ func New( sentryhandler.StreamLogHandler, cancelhandler.Stream, // Should be below LogHandler auth.StreamServerInterceptor(cfg.Auth), - lh.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued + limitHandler.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued grpctracing.StreamServerTracingInterceptor(), cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), // Panic handler should remain last so that application panics will be @@ -153,7 +136,7 @@ func New( sentryhandler.UnaryLogHandler, cancelhandler.Unary, // Should be below LogHandler auth.UnaryServerInterceptor(cfg.Auth), - lh.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued + limitHandler.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued grpctracing.UnaryServerTracingInterceptor(), cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), // Panic handler should remain last so that application panics will be diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index 2b2d9e73d..279ad9f70 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/maintenance" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -23,6 +24,7 @@ import ( type GitalyServerFactory struct { registry *backchannel.Registry cacheInvalidator cache.Invalidator + limitHandler *limithandler.LimiterMiddleware cfg config.Cfg logger *logrus.Entry externalServers []*grpc.Server @@ -36,12 +38,14 @@ func NewGitalyServerFactory( logger *logrus.Entry, registry *backchannel.Registry, cacheInvalidator cache.Invalidator, + limitHandler *limithandler.LimiterMiddleware, ) *GitalyServerFactory { return &GitalyServerFactory{ cfg: cfg, logger: logger, registry: registry, cacheInvalidator: cacheInvalidator, + limitHandler: limitHandler, } } @@ -136,7 +140,7 @@ func (s *GitalyServerFactory) GracefulStop() { // CreateExternal creates a new external gRPC server. The external servers are closed // before the internal servers when gracefully shutting down. func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) { - server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator) + server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler) if err != nil { return nil, err } @@ -148,7 +152,7 @@ func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) // CreateInternal creates a new internal gRPC server. Internal servers are closed // after the external ones when gracefully shutting down. func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, error) { - server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator) + server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler) if err != nil { return nil, err } diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index eed519d4d..6e29b9cd3 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/v14/internal/cache" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "google.golang.org/grpc" @@ -88,7 +89,13 @@ func TestGitalyServerFactory(t *testing.T) { t.Run("insecure", func(t *testing.T) { cfg := testcfg.Build(t) - sf := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + sf := NewGitalyServerFactory( + cfg, + testhelper.NewDiscardingLogEntry(t), + backchannel.NewRegistry(), + cache.New(cfg, config.NewLocator(cfg)), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), + ) checkHealth(t, sf, starter.TCP, "localhost:0") }) @@ -101,7 +108,13 @@ func TestGitalyServerFactory(t *testing.T) { KeyPath: keyFile, }})) - sf := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + sf := NewGitalyServerFactory( + cfg, + testhelper.NewDiscardingLogEntry(t), + backchannel.NewRegistry(), + cache.New(cfg, config.NewLocator(cfg)), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), + ) t.Cleanup(sf.Stop) checkHealth(t, sf, starter.TLS, "localhost:0") @@ -109,7 +122,13 @@ func TestGitalyServerFactory(t *testing.T) { t.Run("all services must be stopped", func(t *testing.T) { cfg := testcfg.Build(t) - sf := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + sf := NewGitalyServerFactory( + cfg, + testhelper.NewDiscardingLogEntry(t), + backchannel.NewRegistry(), + cache.New(cfg, config.NewLocator(cfg)), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), + ) t.Cleanup(sf.Stop) tcpHealthClient := checkHealth(t, sf, starter.TCP, "localhost:0") @@ -131,7 +150,13 @@ func TestGitalyServerFactory(t *testing.T) { t.Run("logging check", func(t *testing.T) { cfg := testcfg.Build(t) logger, hook := test.NewNullLogger() - sf := NewGitalyServerFactory(cfg, logger.WithContext(ctx), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + sf := NewGitalyServerFactory( + cfg, + logger.WithContext(ctx), + backchannel.NewRegistry(), + cache.New(cfg, config.NewLocator(cfg)), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), + ) checkHealth(t, sf, starter.TCP, "localhost:0") @@ -160,7 +185,13 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { defer cancel() cfg := testcfg.Build(t) - sf := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + sf := NewGitalyServerFactory( + cfg, + testhelper.NewDiscardingLogEntry(t), + backchannel.NewRegistry(), + cache.New(cfg, config.NewLocator(cfg)), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), + ) defer sf.Stop() errQuickRPC := status.Error(codes.Internal, "quick RPC") diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index dcdec1783..3228f5e4b 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v14/internal/streamcache" ) @@ -31,6 +32,7 @@ type Dependencies struct { CatfileCache catfile.Cache DiskCache cache.Cache PackObjectsCache streamcache.Cache + LimitHandler *limithandler.LimiterMiddleware } // GetCfg returns service configuration. @@ -97,3 +99,8 @@ func (dc *Dependencies) GetDiskCache() cache.Cache { func (dc *Dependencies) GetPackObjectsCache() streamcache.Cache { return dc.PackObjectsCache } + +// GetLimitHandler returns the RPC limit handler. +func (dc *Dependencies) GetLimitHandler() *limithandler.LimiterMiddleware { + return dc.LimitHandler +} diff --git a/internal/gitaly/service/repository/create_fork_test.go b/internal/gitaly/service/repository/create_fork_test.go index dca8986e4..ec8721697 100644 --- a/internal/gitaly/service/repository/create_fork_test.go +++ b/internal/gitaly/service/repository/create_fork_test.go @@ -32,6 +32,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" @@ -270,7 +271,8 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s registry := backchannel.NewRegistry() locator := config.NewLocator(cfg) cache := cache.New(cfg, locator) - server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo) + server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache, limitHandler) require.NoError(t, err) listener, addr := testhelper.GetLocalhostListener(t) diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index ab3a1cf3e..7088c7077 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -10,14 +10,6 @@ import ( // LimitedFunc represents a function that will be limited type LimitedFunc func() (resp interface{}, err error) -// ConcurrencyMonitor allows the concurrency monitor to be observed -type ConcurrencyMonitor interface { - Queued(ctx context.Context) - Dequeued(ctx context.Context) - Enter(ctx context.Context, acquireTime time.Duration) - Exit(ctx context.Context) -} - // ConcurrencyLimiter contains rate limiter state type ConcurrencyLimiter struct { semaphores map[string]*semaphoreReference @@ -119,10 +111,3 @@ func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter { monitor: monitor, } } - -type nullConcurrencyMonitor struct{} - -func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {} -func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {} -func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {} -func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {} diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index bff19c688..638e22b89 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -70,6 +70,8 @@ func (c *counter) Exit(ctx context.Context) { } func TestLimiter(t *testing.T) { + t.Parallel() + tests := []struct { name string concurrency int @@ -123,7 +125,10 @@ func TestLimiter(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := testhelper.Context() defer cancel() diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go deleted file mode 100644 index 7d8d6ab8f..000000000 --- a/internal/middleware/limithandler/limithandler.go +++ /dev/null @@ -1,117 +0,0 @@ -package limithandler - -import ( - "context" - - "google.golang.org/grpc" -) - -// GetLockKey function defines the lock key of an RPC invocation based on its context -type GetLockKey func(context.Context) string - -// LimiterMiddleware contains rate limiter state -type LimiterMiddleware struct { - methodLimiters map[string]*ConcurrencyLimiter - getLockKey GetLockKey -} - -type wrappedStream struct { - grpc.ServerStream - info *grpc.StreamServerInfo - limiterMiddleware *LimiterMiddleware - initial bool -} - -var maxConcurrencyPerRepoPerRPC map[string]int - -// UnaryInterceptor returns a Unary Interceptor -func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - lockKey := c.getLockKey(ctx) - if lockKey == "" { - return handler(ctx, req) - } - - limiter := c.methodLimiters[info.FullMethod] - if limiter == nil { - // No concurrency limiting - return handler(ctx, req) - } - - return limiter.Limit(ctx, lockKey, func() (interface{}, error) { - return handler(ctx, req) - }) - } -} - -// StreamInterceptor returns a Stream Interceptor -func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { - return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - wrapper := &wrappedStream{stream, info, c, true} - return handler(srv, wrapper) - } -} - -func (w *wrappedStream) RecvMsg(m interface{}) error { - if err := w.ServerStream.RecvMsg(m); err != nil { - return err - } - - // Only perform limiting on the first request of a stream - if !w.initial { - return nil - } - - w.initial = false - - ctx := w.Context() - - lockKey := w.limiterMiddleware.getLockKey(ctx) - if lockKey == "" { - return nil - } - - limiter := w.limiterMiddleware.methodLimiters[w.info.FullMethod] - if limiter == nil { - // No concurrency limiting - return nil - } - - ready := make(chan struct{}) - go limiter.Limit(ctx, lockKey, func() (interface{}, error) { - close(ready) - <-ctx.Done() - return nil, nil - }) - - select { - case <-ctx.Done(): - return ctx.Err() - case <-ready: - // It's our turn! - return nil - } -} - -// New creates a new rate limiter -func New(getLockKey GetLockKey) LimiterMiddleware { - return LimiterMiddleware{ - methodLimiters: createLimiterConfig(), - getLockKey: getLockKey, - } -} - -func createLimiterConfig() map[string]*ConcurrencyLimiter { - result := make(map[string]*ConcurrencyLimiter) - - for fullMethodName, max := range maxConcurrencyPerRepoPerRPC { - result[fullMethodName] = NewLimiter(max, NewPromMonitor("gitaly", fullMethodName)) - } - - return result -} - -// SetMaxRepoConcurrency Configures the max concurrency per repo per RPC -func SetMaxRepoConcurrency(config map[string]int) { - maxConcurrencyPerRepoPerRPC = config -} diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go deleted file mode 100644 index 307d00944..000000000 --- a/internal/middleware/limithandler/metrics.go +++ /dev/null @@ -1,107 +0,0 @@ -package limithandler - -import ( - "context" - "strings" - "time" - - "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -const acquireDurationLogThreshold = 10 * time.Millisecond - -var ( - histogramVec *prometheus.HistogramVec - inprogressGaugeVec = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "rate_limiting", - Name: "in_progress", - Help: "Gauge of number of concurrent in-progress calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - - queuedGaugeVec = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "rate_limiting", - Name: "queued", - Help: "Gauge of number of queued calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) -) - -type promMonitor struct { - queuedGauge prometheus.Gauge - inprogressGauge prometheus.Gauge - histogram prometheus.Observer -} - -func splitMethodName(fullMethodName string) (string, string) { - fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash - if i := strings.Index(fullMethodName, "/"); i >= 0 { - return fullMethodName[:i], fullMethodName[i+1:] - } - return "unknown", "unknown" -} - -// EnableAcquireTimeHistogram enables histograms for acquisition times -func EnableAcquireTimeHistogram(buckets []float64) { - histogramOpts := prometheus.HistogramOpts{ - Namespace: "gitaly", - Subsystem: "rate_limiting", - Name: "acquiring_seconds", - Help: "Histogram of time calls are rate limited (in seconds)", - Buckets: buckets, - } - - histogramVec = promauto.NewHistogramVec( - histogramOpts, - []string{"system", "grpc_service", "grpc_method"}, - ) -} - -func (c *promMonitor) Queued(ctx context.Context) { - c.queuedGauge.Inc() -} - -func (c *promMonitor) Dequeued(ctx context.Context) { - c.queuedGauge.Dec() -} - -func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) { - c.inprogressGauge.Inc() - - if acquireTime > acquireDurationLogThreshold { - logger := ctxlogrus.Extract(ctx) - logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait") - } - - if c.histogram != nil { - c.histogram.Observe(acquireTime.Seconds()) - } -} - -func (c *promMonitor) Exit(ctx context.Context) { - c.inprogressGauge.Dec() -} - -// NewPromMonitor creates a new ConcurrencyMonitor that tracks limiter -// activity in Prometheus. -func NewPromMonitor(system string, fullMethod string) ConcurrencyMonitor { - serviceName, methodName := splitMethodName(fullMethod) - - queuedGauge := queuedGaugeVec.WithLabelValues(system, serviceName, methodName) - inprogressGauge := inprogressGaugeVec.WithLabelValues(system, serviceName, methodName) - - var histogram prometheus.Observer - if histogramVec != nil { - histogram = histogramVec.WithLabelValues(system, serviceName, methodName) - } - - return &promMonitor{queuedGauge, inprogressGauge, histogram} -} diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go new file mode 100644 index 000000000..febc25576 --- /dev/null +++ b/internal/middleware/limithandler/middleware.go @@ -0,0 +1,185 @@ +package limithandler + +import ( + "context" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + "github.com/prometheus/client_golang/prometheus" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "google.golang.org/grpc" +) + +// GetLockKey function defines the lock key of an RPC invocation based on its context +type GetLockKey func(context.Context) string + +// LimitConcurrencyByRepo implements GetLockKey by using the repository path as lock. +func LimitConcurrencyByRepo(ctx context.Context) string { + tags := grpcmwtags.Extract(ctx) + ctxValue := tags.Values()["grpc.request.repoPath"] + if ctxValue == nil { + return "" + } + + s, ok := ctxValue.(string) + if ok { + return s + } + + return "" +} + +// LimiterMiddleware contains rate limiter state +type LimiterMiddleware struct { + methodLimiters map[string]*ConcurrencyLimiter + getLockKey GetLockKey + + acquiringSecondsMetric *prometheus.HistogramVec + inProgressMetric *prometheus.GaugeVec + queuedMetric *prometheus.GaugeVec +} + +// New creates a new rate limiter +func New(cfg config.Cfg, getLockKey GetLockKey) *LimiterMiddleware { + middleware := &LimiterMiddleware{ + getLockKey: getLockKey, + + acquiringSecondsMetric: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "acquiring_seconds", + Help: "Histogram of time calls are rate limited (in seconds)", + Buckets: cfg.Prometheus.GRPCLatencyBuckets, + }, + []string{"system", "grpc_service", "grpc_method"}, + ), + inProgressMetric: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "in_progress", + Help: "Gauge of number of concurrent in-progress calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ), + queuedMetric: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "queued", + Help: "Gauge of number of queued calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ), + } + middleware.methodLimiters = createLimiterConfig(middleware, cfg) + return middleware +} + +// Describe is used to describe Prometheus metrics. +func (c *LimiterMiddleware) Describe(descs chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(c, descs) +} + +// Collect is used to collect Prometheus metrics. +func (c *LimiterMiddleware) Collect(metrics chan<- prometheus.Metric) { + c.acquiringSecondsMetric.Collect(metrics) + c.inProgressMetric.Collect(metrics) + c.queuedMetric.Collect(metrics) +} + +// UnaryInterceptor returns a Unary Interceptor +func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + lockKey := c.getLockKey(ctx) + if lockKey == "" { + return handler(ctx, req) + } + + limiter := c.methodLimiters[info.FullMethod] + if limiter == nil { + // No concurrency limiting + return handler(ctx, req) + } + + return limiter.Limit(ctx, lockKey, func() (interface{}, error) { + return handler(ctx, req) + }) + } +} + +// StreamInterceptor returns a Stream Interceptor +func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + wrapper := &wrappedStream{stream, info, c, true} + return handler(srv, wrapper) + } +} + +func createLimiterConfig(middleware *LimiterMiddleware, cfg config.Cfg) map[string]*ConcurrencyLimiter { + result := make(map[string]*ConcurrencyLimiter) + for _, limit := range cfg.Concurrency { + result[limit.RPC] = NewLimiter(limit.MaxPerRepo, newPromMonitor(middleware, "gitaly", limit.RPC)) + } + + // Set default for ReplicateRepository. + replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" + if _, ok := result[replicateRepositoryFullMethod]; !ok { + result[replicateRepositoryFullMethod] = NewLimiter(1, newPromMonitor(middleware, "gitaly", replicateRepositoryFullMethod)) + } + + return result +} + +type wrappedStream struct { + grpc.ServerStream + info *grpc.StreamServerInfo + limiterMiddleware *LimiterMiddleware + initial bool +} + +func (w *wrappedStream) RecvMsg(m interface{}) error { + if err := w.ServerStream.RecvMsg(m); err != nil { + return err + } + + // Only perform limiting on the first request of a stream + if !w.initial { + return nil + } + + w.initial = false + + ctx := w.Context() + + lockKey := w.limiterMiddleware.getLockKey(ctx) + if lockKey == "" { + return nil + } + + limiter := w.limiterMiddleware.methodLimiters[w.info.FullMethod] + if limiter == nil { + // No concurrency limiting + return nil + } + + ready := make(chan struct{}) + go func() { + if _, err := limiter.Limit(ctx, lockKey, func() (interface{}, error) { + close(ready) + <-ctx.Done() + return nil, nil + }); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("rate limiting streaming request") + } + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ready: + // It's our turn! + return nil + } +} diff --git a/internal/middleware/limithandler/limithandler_test.go b/internal/middleware/limithandler/middleware_test.go index 84df9f150..16926c004 100644 --- a/internal/middleware/limithandler/limithandler_test.go +++ b/internal/middleware/limithandler/middleware_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -24,10 +25,17 @@ func fixedLockKey(ctx context.Context) string { } func TestUnaryLimitHandler(t *testing.T) { + t.Parallel() + s := &server{blockCh: make(chan struct{})} - limithandler.SetMaxRepoConcurrency(map[string]int{"/test.limithandler.Test/Unary": 2}) - lh := limithandler.New(fixedLockKey) + cfg := config.Cfg{ + Concurrency: []config.Concurrency{ + {RPC: "/test.limithandler.Test/Unary", MaxPerRepo: 2}, + }, + } + + lh := limithandler.New(cfg, fixedLockKey) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() @@ -64,6 +72,8 @@ func TestUnaryLimitHandler(t *testing.T) { } func TestStreamLimitHandler(t *testing.T) { + t.Parallel() + testCases := []struct { desc string fullname string @@ -170,13 +180,17 @@ func TestStreamLimitHandler(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + s := &server{blockCh: make(chan struct{})} - limithandler.SetMaxRepoConcurrency(map[string]int{ - tc.fullname: tc.maxConcurrency, - }) + cfg := config.Cfg{ + Concurrency: []config.Concurrency{ + {RPC: tc.fullname, MaxPerRepo: tc.maxConcurrency}, + }, + } - lh := limithandler.New(fixedLockKey) + lh := limithandler.New(cfg, fixedLockKey) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go new file mode 100644 index 000000000..40c3869dc --- /dev/null +++ b/internal/middleware/limithandler/monitor.go @@ -0,0 +1,76 @@ +package limithandler + +import ( + "context" + "strings" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/prometheus/client_golang/prometheus" +) + +const acquireDurationLogThreshold = 10 * time.Millisecond + +// ConcurrencyMonitor allows the concurrency monitor to be observed +type ConcurrencyMonitor interface { + Queued(ctx context.Context) + Dequeued(ctx context.Context) + Enter(ctx context.Context, acquireTime time.Duration) + Exit(ctx context.Context) +} + +type nullConcurrencyMonitor struct{} + +func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {} +func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {} +func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {} +func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {} + +type promMonitor struct { + queuedMetric prometheus.Gauge + inProgressMetric prometheus.Gauge + acquiringSecondsMetric prometheus.Observer +} + +// newPromMonitor creates a new ConcurrencyMonitor that tracks limiter +// activity in Prometheus. +func newPromMonitor(lh *LimiterMiddleware, system string, fullMethod string) ConcurrencyMonitor { + serviceName, methodName := splitMethodName(fullMethod) + + return &promMonitor{ + lh.queuedMetric.WithLabelValues(system, serviceName, methodName), + lh.inProgressMetric.WithLabelValues(system, serviceName, methodName), + lh.acquiringSecondsMetric.WithLabelValues(system, serviceName, methodName), + } +} + +func (c *promMonitor) Queued(ctx context.Context) { + c.queuedMetric.Inc() +} + +func (c *promMonitor) Dequeued(ctx context.Context) { + c.queuedMetric.Dec() +} + +func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) { + c.inProgressMetric.Inc() + + if acquireTime > acquireDurationLogThreshold { + logger := ctxlogrus.Extract(ctx) + logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait") + } + + c.acquiringSecondsMetric.Observe(acquireTime.Seconds()) +} + +func (c *promMonitor) Exit(ctx context.Context) { + c.inProgressMetric.Dec() +} + +func splitMethodName(fullMethodName string) (string, string) { + fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash + if i := strings.Index(fullMethodName, "/"); i >= 0 { + return fullMethodName[:i], fullMethodName[i+1:] + } + return "unknown", "unknown" +} diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 0e48d6622..ed1acf67d 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -30,6 +30,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" praefectconfig "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -219,6 +220,7 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi gsd.logger.WithField("test", t.Name()), deps.GetBackchannelRegistry(), deps.GetDiskCache(), + deps.GetLimitHandler(), ) if cfg.InternalSocketDir != "" { @@ -299,6 +301,7 @@ type gitalyServerDeps struct { catfileCache catfile.Cache diskCache cache.Cache packObjectsCache streamcache.Cache + limitHandler *limithandler.LimiterMiddleware } func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server) *service.Dependencies { @@ -357,6 +360,10 @@ func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, ru t.Cleanup(gsd.packObjectsCache.Stop) } + if gsd.limitHandler == nil { + gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo) + } + return &service.Dependencies{ Cfg: cfg, RubyServer: rubyServer, @@ -371,6 +378,7 @@ func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, ru CatfileCache: gsd.catfileCache, DiskCache: gsd.diskCache, PackObjectsCache: gsd.packObjectsCache, + LimitHandler: gsd.limitHandler, } } |