Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2021-12-16 21:38:14 +0300
committerJohn Cai <jcai@gitlab.com>2021-12-16 21:38:14 +0300
commit61471fff6a262bc2c9c5270016f8a677e48638bb (patch)
tree2ce78c49318abb1916885bde6f883c5c66e64ce7 /internal
parent1762b3a49fef1f12c2dc6ac153807b2c8c25b1fe (diff)
parent2766b879b037ad07e4313e76debbede691b40000 (diff)
Merge branch 'pks-limithandler-remove-globals' into 'master'
limithandler: Refactor package to not use global state See merge request gitlab-org/gitaly!4197
Diffstat (limited to 'internal')
-rw-r--r--internal/gitaly/config/concurrency.go22
-rw-r--r--internal/gitaly/config/prometheus/config.go3
-rw-r--r--internal/gitaly/server/auth_test.go15
-rw-r--r--internal/gitaly/server/server.go23
-rw-r--r--internal/gitaly/server/server_factory.go8
-rw-r--r--internal/gitaly/server/server_factory_test.go41
-rw-r--r--internal/gitaly/service/dependencies.go7
-rw-r--r--internal/gitaly/service/repository/create_fork_test.go4
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go15
-rw-r--r--internal/middleware/limithandler/concurrency_limiter_test.go5
-rw-r--r--internal/middleware/limithandler/limithandler.go117
-rw-r--r--internal/middleware/limithandler/metrics.go107
-rw-r--r--internal/middleware/limithandler/middleware.go185
-rw-r--r--internal/middleware/limithandler/middleware_test.go (renamed from internal/middleware/limithandler/limithandler_test.go)26
-rw-r--r--internal/middleware/limithandler/monitor.go76
-rw-r--r--internal/testhelper/testserver/gitaly.go8
16 files changed, 360 insertions, 302 deletions
diff --git a/internal/gitaly/config/concurrency.go b/internal/gitaly/config/concurrency.go
deleted file mode 100644
index 1c54a0250..000000000
--- a/internal/gitaly/config/concurrency.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package config
-
-import (
- "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
-)
-
-// ConfigureConcurrencyLimits configures the per-repo, per RPC rate limits
-func ConfigureConcurrencyLimits(cfg Cfg) {
- maxConcurrencyPerRepoPerRPC := make(map[string]int)
-
- for _, v := range cfg.Concurrency {
- maxConcurrencyPerRepoPerRPC[v.RPC] = v.MaxPerRepo
- }
-
- // Set default for ReplicateRepository
- replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
- if _, ok := maxConcurrencyPerRepoPerRPC[replicateRepositoryFullMethod]; !ok {
- maxConcurrencyPerRepoPerRPC[replicateRepositoryFullMethod] = 1
- }
-
- limithandler.SetMaxRepoConcurrency(maxConcurrencyPerRepoPerRPC)
-}
diff --git a/internal/gitaly/config/prometheus/config.go b/internal/gitaly/config/prometheus/config.go
index 8d3c91fc4..4873eec1d 100644
--- a/internal/gitaly/config/prometheus/config.go
+++ b/internal/gitaly/config/prometheus/config.go
@@ -6,7 +6,6 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
)
// Config contains additional configuration data for prometheus
@@ -40,6 +39,4 @@ func (c *Config) Configure() {
grpcprometheus.EnableClientHandlingTimeHistogram(func(histogramOpts *prometheus.HistogramOpts) {
histogramOpts.Buckets = c.GRPCLatencyBuckets
})
-
- limithandler.EnableAcquireTimeHistogram(c.GRPCLatencyBuckets)
}
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index 016527da5..44ffcf08c 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -26,6 +26,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitlab"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
@@ -200,8 +201,9 @@ func runServer(t *testing.T, cfg config.Cfg) string {
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
diskCache := cache.New(cfg, locator)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
- srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache)
+ srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler)
require.NoError(t, err)
setup.RegisterAll(srv, &service.Dependencies{
@@ -235,7 +237,14 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
conns := client.NewPool()
t.Cleanup(func() { conns.Close() })
- srv, err := New(true, cfg, testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
+ srv, err := New(
+ true,
+ cfg,
+ testhelper.NewDiscardingLogEntry(t),
+ backchannel.NewRegistry(),
+ cache.New(cfg, config.NewLocator(cfg)),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ )
require.NoError(t, err)
healthpb.RegisterHealthServer(srv, health.NewServer())
@@ -307,8 +316,6 @@ func TestAuthBeforeLimit(t *testing.T) {
},
))
- config.ConfigureConcurrencyLimits(cfg)
-
gitlabURL, cleanup := gitlab.SetupAndStartGitlabServer(t, cfg.GitlabShell.Dir, &gitlab.TestServerOptions{
SecretToken: "secretToken",
GLID: gittest.GlID,
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go
index 9dcf07aa4..06a665dcf 100644
--- a/internal/gitaly/server/server.go
+++ b/internal/gitaly/server/server.go
@@ -1,7 +1,6 @@
package server
import (
- "context"
"crypto/tls"
"fmt"
"time"
@@ -37,21 +36,6 @@ import (
"google.golang.org/grpc/keepalive"
)
-func concurrencyKeyFn(ctx context.Context) string {
- tags := grpcmwtags.Extract(ctx)
- ctxValue := tags.Values()["grpc.request.repoPath"]
- if ctxValue == nil {
- return ""
- }
-
- s, ok := ctxValue.(string)
- if ok {
- return s
- }
-
- return ""
-}
-
func init() {
for _, l := range gitalylog.Loggers {
urlSanitizer := logsanitizer.NewURLSanitizerHook()
@@ -75,13 +59,12 @@ func New(
logrusEntry *log.Entry,
registry *backchannel.Registry,
cacheInvalidator diskcache.Invalidator,
+ limitHandler *limithandler.LimiterMiddleware,
) (*grpc.Server, error) {
ctxTagOpts := []grpcmwtags.Option{
grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
}
- lh := limithandler.New(concurrencyKeyFn)
-
transportCredentials := insecure.NewCredentials()
// If tls config is specified attempt to extract tls options and use it
// as a grpc.ServerOption
@@ -132,7 +115,7 @@ func New(
sentryhandler.StreamLogHandler,
cancelhandler.Stream, // Should be below LogHandler
auth.StreamServerInterceptor(cfg.Auth),
- lh.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued
+ limitHandler.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued
grpctracing.StreamServerTracingInterceptor(),
cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered),
// Panic handler should remain last so that application panics will be
@@ -153,7 +136,7 @@ func New(
sentryhandler.UnaryLogHandler,
cancelhandler.Unary, // Should be below LogHandler
auth.UnaryServerInterceptor(cfg.Auth),
- lh.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued
+ limitHandler.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued
grpctracing.UnaryServerTracingInterceptor(),
cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered),
// Panic handler should remain last so that application panics will be
diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go
index 2b2d9e73d..279ad9f70 100644
--- a/internal/gitaly/server/server_factory.go
+++ b/internal/gitaly/server/server_factory.go
@@ -15,6 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/maintenance"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
)
@@ -23,6 +24,7 @@ import (
type GitalyServerFactory struct {
registry *backchannel.Registry
cacheInvalidator cache.Invalidator
+ limitHandler *limithandler.LimiterMiddleware
cfg config.Cfg
logger *logrus.Entry
externalServers []*grpc.Server
@@ -36,12 +38,14 @@ func NewGitalyServerFactory(
logger *logrus.Entry,
registry *backchannel.Registry,
cacheInvalidator cache.Invalidator,
+ limitHandler *limithandler.LimiterMiddleware,
) *GitalyServerFactory {
return &GitalyServerFactory{
cfg: cfg,
logger: logger,
registry: registry,
cacheInvalidator: cacheInvalidator,
+ limitHandler: limitHandler,
}
}
@@ -136,7 +140,7 @@ func (s *GitalyServerFactory) GracefulStop() {
// CreateExternal creates a new external gRPC server. The external servers are closed
// before the internal servers when gracefully shutting down.
func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) {
- server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator)
+ server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler)
if err != nil {
return nil, err
}
@@ -148,7 +152,7 @@ func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error)
// CreateInternal creates a new internal gRPC server. Internal servers are closed
// after the external ones when gracefully shutting down.
func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, error) {
- server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator)
+ server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler)
if err != nil {
return nil, err
}
diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go
index eed519d4d..6e29b9cd3 100644
--- a/internal/gitaly/server/server_factory_test.go
+++ b/internal/gitaly/server/server_factory_test.go
@@ -18,6 +18,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter"
"gitlab.com/gitlab-org/gitaly/v14/internal/cache"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"google.golang.org/grpc"
@@ -88,7 +89,13 @@ func TestGitalyServerFactory(t *testing.T) {
t.Run("insecure", func(t *testing.T) {
cfg := testcfg.Build(t)
- sf := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
+ sf := NewGitalyServerFactory(
+ cfg,
+ testhelper.NewDiscardingLogEntry(t),
+ backchannel.NewRegistry(),
+ cache.New(cfg, config.NewLocator(cfg)),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ )
checkHealth(t, sf, starter.TCP, "localhost:0")
})
@@ -101,7 +108,13 @@ func TestGitalyServerFactory(t *testing.T) {
KeyPath: keyFile,
}}))
- sf := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
+ sf := NewGitalyServerFactory(
+ cfg,
+ testhelper.NewDiscardingLogEntry(t),
+ backchannel.NewRegistry(),
+ cache.New(cfg, config.NewLocator(cfg)),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ )
t.Cleanup(sf.Stop)
checkHealth(t, sf, starter.TLS, "localhost:0")
@@ -109,7 +122,13 @@ func TestGitalyServerFactory(t *testing.T) {
t.Run("all services must be stopped", func(t *testing.T) {
cfg := testcfg.Build(t)
- sf := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
+ sf := NewGitalyServerFactory(
+ cfg,
+ testhelper.NewDiscardingLogEntry(t),
+ backchannel.NewRegistry(),
+ cache.New(cfg, config.NewLocator(cfg)),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ )
t.Cleanup(sf.Stop)
tcpHealthClient := checkHealth(t, sf, starter.TCP, "localhost:0")
@@ -131,7 +150,13 @@ func TestGitalyServerFactory(t *testing.T) {
t.Run("logging check", func(t *testing.T) {
cfg := testcfg.Build(t)
logger, hook := test.NewNullLogger()
- sf := NewGitalyServerFactory(cfg, logger.WithContext(ctx), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
+ sf := NewGitalyServerFactory(
+ cfg,
+ logger.WithContext(ctx),
+ backchannel.NewRegistry(),
+ cache.New(cfg, config.NewLocator(cfg)),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ )
checkHealth(t, sf, starter.TCP, "localhost:0")
@@ -160,7 +185,13 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) {
defer cancel()
cfg := testcfg.Build(t)
- sf := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
+ sf := NewGitalyServerFactory(
+ cfg,
+ testhelper.NewDiscardingLogEntry(t),
+ backchannel.NewRegistry(),
+ cache.New(cfg, config.NewLocator(cfg)),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ )
defer sf.Stop()
errQuickRPC := status.Error(codes.Internal, "quick RPC")
diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go
index dcdec1783..3228f5e4b 100644
--- a/internal/gitaly/service/dependencies.go
+++ b/internal/gitaly/service/dependencies.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitlab"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/streamcache"
)
@@ -31,6 +32,7 @@ type Dependencies struct {
CatfileCache catfile.Cache
DiskCache cache.Cache
PackObjectsCache streamcache.Cache
+ LimitHandler *limithandler.LimiterMiddleware
}
// GetCfg returns service configuration.
@@ -97,3 +99,8 @@ func (dc *Dependencies) GetDiskCache() cache.Cache {
func (dc *Dependencies) GetPackObjectsCache() streamcache.Cache {
return dc.PackObjectsCache
}
+
+// GetLimitHandler returns the RPC limit handler.
+func (dc *Dependencies) GetLimitHandler() *limithandler.LimiterMiddleware {
+ return dc.LimitHandler
+}
diff --git a/internal/gitaly/service/repository/create_fork_test.go b/internal/gitaly/service/repository/create_fork_test.go
index dca8986e4..ec8721697 100644
--- a/internal/gitaly/service/repository/create_fork_test.go
+++ b/internal/gitaly/service/repository/create_fork_test.go
@@ -32,6 +32,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
@@ -270,7 +271,8 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s
registry := backchannel.NewRegistry()
locator := config.NewLocator(cfg)
cache := cache.New(cfg, locator)
- server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
+ server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache, limitHandler)
require.NoError(t, err)
listener, addr := testhelper.GetLocalhostListener(t)
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index ab3a1cf3e..7088c7077 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -10,14 +10,6 @@ import (
// LimitedFunc represents a function that will be limited
type LimitedFunc func() (resp interface{}, err error)
-// ConcurrencyMonitor allows the concurrency monitor to be observed
-type ConcurrencyMonitor interface {
- Queued(ctx context.Context)
- Dequeued(ctx context.Context)
- Enter(ctx context.Context, acquireTime time.Duration)
- Exit(ctx context.Context)
-}
-
// ConcurrencyLimiter contains rate limiter state
type ConcurrencyLimiter struct {
semaphores map[string]*semaphoreReference
@@ -119,10 +111,3 @@ func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
monitor: monitor,
}
}
-
-type nullConcurrencyMonitor struct{}
-
-func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {}
-func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {}
-func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {}
-func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {}
diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go
index bff19c688..638e22b89 100644
--- a/internal/middleware/limithandler/concurrency_limiter_test.go
+++ b/internal/middleware/limithandler/concurrency_limiter_test.go
@@ -70,6 +70,8 @@ func (c *counter) Exit(ctx context.Context) {
}
func TestLimiter(t *testing.T) {
+ t.Parallel()
+
tests := []struct {
name string
concurrency int
@@ -123,7 +125,10 @@ func TestLimiter(t *testing.T) {
},
}
for _, tt := range tests {
+ tt := tt
t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
ctx, cancel := testhelper.Context()
defer cancel()
diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go
deleted file mode 100644
index 7d8d6ab8f..000000000
--- a/internal/middleware/limithandler/limithandler.go
+++ /dev/null
@@ -1,117 +0,0 @@
-package limithandler
-
-import (
- "context"
-
- "google.golang.org/grpc"
-)
-
-// GetLockKey function defines the lock key of an RPC invocation based on its context
-type GetLockKey func(context.Context) string
-
-// LimiterMiddleware contains rate limiter state
-type LimiterMiddleware struct {
- methodLimiters map[string]*ConcurrencyLimiter
- getLockKey GetLockKey
-}
-
-type wrappedStream struct {
- grpc.ServerStream
- info *grpc.StreamServerInfo
- limiterMiddleware *LimiterMiddleware
- initial bool
-}
-
-var maxConcurrencyPerRepoPerRPC map[string]int
-
-// UnaryInterceptor returns a Unary Interceptor
-func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- lockKey := c.getLockKey(ctx)
- if lockKey == "" {
- return handler(ctx, req)
- }
-
- limiter := c.methodLimiters[info.FullMethod]
- if limiter == nil {
- // No concurrency limiting
- return handler(ctx, req)
- }
-
- return limiter.Limit(ctx, lockKey, func() (interface{}, error) {
- return handler(ctx, req)
- })
- }
-}
-
-// StreamInterceptor returns a Stream Interceptor
-func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor {
- return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- wrapper := &wrappedStream{stream, info, c, true}
- return handler(srv, wrapper)
- }
-}
-
-func (w *wrappedStream) RecvMsg(m interface{}) error {
- if err := w.ServerStream.RecvMsg(m); err != nil {
- return err
- }
-
- // Only perform limiting on the first request of a stream
- if !w.initial {
- return nil
- }
-
- w.initial = false
-
- ctx := w.Context()
-
- lockKey := w.limiterMiddleware.getLockKey(ctx)
- if lockKey == "" {
- return nil
- }
-
- limiter := w.limiterMiddleware.methodLimiters[w.info.FullMethod]
- if limiter == nil {
- // No concurrency limiting
- return nil
- }
-
- ready := make(chan struct{})
- go limiter.Limit(ctx, lockKey, func() (interface{}, error) {
- close(ready)
- <-ctx.Done()
- return nil, nil
- })
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-ready:
- // It's our turn!
- return nil
- }
-}
-
-// New creates a new rate limiter
-func New(getLockKey GetLockKey) LimiterMiddleware {
- return LimiterMiddleware{
- methodLimiters: createLimiterConfig(),
- getLockKey: getLockKey,
- }
-}
-
-func createLimiterConfig() map[string]*ConcurrencyLimiter {
- result := make(map[string]*ConcurrencyLimiter)
-
- for fullMethodName, max := range maxConcurrencyPerRepoPerRPC {
- result[fullMethodName] = NewLimiter(max, NewPromMonitor("gitaly", fullMethodName))
- }
-
- return result
-}
-
-// SetMaxRepoConcurrency Configures the max concurrency per repo per RPC
-func SetMaxRepoConcurrency(config map[string]int) {
- maxConcurrencyPerRepoPerRPC = config
-}
diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go
deleted file mode 100644
index 307d00944..000000000
--- a/internal/middleware/limithandler/metrics.go
+++ /dev/null
@@ -1,107 +0,0 @@
-package limithandler
-
-import (
- "context"
- "strings"
- "time"
-
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
-)
-
-const acquireDurationLogThreshold = 10 * time.Millisecond
-
-var (
- histogramVec *prometheus.HistogramVec
- inprogressGaugeVec = promauto.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "rate_limiting",
- Name: "in_progress",
- Help: "Gauge of number of concurrent in-progress calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
-
- queuedGaugeVec = promauto.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "rate_limiting",
- Name: "queued",
- Help: "Gauge of number of queued calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
-)
-
-type promMonitor struct {
- queuedGauge prometheus.Gauge
- inprogressGauge prometheus.Gauge
- histogram prometheus.Observer
-}
-
-func splitMethodName(fullMethodName string) (string, string) {
- fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
- if i := strings.Index(fullMethodName, "/"); i >= 0 {
- return fullMethodName[:i], fullMethodName[i+1:]
- }
- return "unknown", "unknown"
-}
-
-// EnableAcquireTimeHistogram enables histograms for acquisition times
-func EnableAcquireTimeHistogram(buckets []float64) {
- histogramOpts := prometheus.HistogramOpts{
- Namespace: "gitaly",
- Subsystem: "rate_limiting",
- Name: "acquiring_seconds",
- Help: "Histogram of time calls are rate limited (in seconds)",
- Buckets: buckets,
- }
-
- histogramVec = promauto.NewHistogramVec(
- histogramOpts,
- []string{"system", "grpc_service", "grpc_method"},
- )
-}
-
-func (c *promMonitor) Queued(ctx context.Context) {
- c.queuedGauge.Inc()
-}
-
-func (c *promMonitor) Dequeued(ctx context.Context) {
- c.queuedGauge.Dec()
-}
-
-func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
- c.inprogressGauge.Inc()
-
- if acquireTime > acquireDurationLogThreshold {
- logger := ctxlogrus.Extract(ctx)
- logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait")
- }
-
- if c.histogram != nil {
- c.histogram.Observe(acquireTime.Seconds())
- }
-}
-
-func (c *promMonitor) Exit(ctx context.Context) {
- c.inprogressGauge.Dec()
-}
-
-// NewPromMonitor creates a new ConcurrencyMonitor that tracks limiter
-// activity in Prometheus.
-func NewPromMonitor(system string, fullMethod string) ConcurrencyMonitor {
- serviceName, methodName := splitMethodName(fullMethod)
-
- queuedGauge := queuedGaugeVec.WithLabelValues(system, serviceName, methodName)
- inprogressGauge := inprogressGaugeVec.WithLabelValues(system, serviceName, methodName)
-
- var histogram prometheus.Observer
- if histogramVec != nil {
- histogram = histogramVec.WithLabelValues(system, serviceName, methodName)
- }
-
- return &promMonitor{queuedGauge, inprogressGauge, histogram}
-}
diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go
new file mode 100644
index 000000000..febc25576
--- /dev/null
+++ b/internal/middleware/limithandler/middleware.go
@@ -0,0 +1,185 @@
+package limithandler
+
+import (
+ "context"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ "github.com/prometheus/client_golang/prometheus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "google.golang.org/grpc"
+)
+
+// GetLockKey function defines the lock key of an RPC invocation based on its context
+type GetLockKey func(context.Context) string
+
+// LimitConcurrencyByRepo implements GetLockKey by using the repository path as lock.
+func LimitConcurrencyByRepo(ctx context.Context) string {
+ tags := grpcmwtags.Extract(ctx)
+ ctxValue := tags.Values()["grpc.request.repoPath"]
+ if ctxValue == nil {
+ return ""
+ }
+
+ s, ok := ctxValue.(string)
+ if ok {
+ return s
+ }
+
+ return ""
+}
+
+// LimiterMiddleware contains rate limiter state
+type LimiterMiddleware struct {
+ methodLimiters map[string]*ConcurrencyLimiter
+ getLockKey GetLockKey
+
+ acquiringSecondsMetric *prometheus.HistogramVec
+ inProgressMetric *prometheus.GaugeVec
+ queuedMetric *prometheus.GaugeVec
+}
+
+// New creates a new rate limiter
+func New(cfg config.Cfg, getLockKey GetLockKey) *LimiterMiddleware {
+ middleware := &LimiterMiddleware{
+ getLockKey: getLockKey,
+
+ acquiringSecondsMetric: prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "acquiring_seconds",
+ Help: "Histogram of time calls are rate limited (in seconds)",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ ),
+ inProgressMetric: prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "in_progress",
+ Help: "Gauge of number of concurrent in-progress calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ ),
+ queuedMetric: prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "queued",
+ Help: "Gauge of number of queued calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ ),
+ }
+ middleware.methodLimiters = createLimiterConfig(middleware, cfg)
+ return middleware
+}
+
+// Describe is used to describe Prometheus metrics.
+func (c *LimiterMiddleware) Describe(descs chan<- *prometheus.Desc) {
+ prometheus.DescribeByCollect(c, descs)
+}
+
+// Collect is used to collect Prometheus metrics.
+func (c *LimiterMiddleware) Collect(metrics chan<- prometheus.Metric) {
+ c.acquiringSecondsMetric.Collect(metrics)
+ c.inProgressMetric.Collect(metrics)
+ c.queuedMetric.Collect(metrics)
+}
+
+// UnaryInterceptor returns a Unary Interceptor
+func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor {
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ lockKey := c.getLockKey(ctx)
+ if lockKey == "" {
+ return handler(ctx, req)
+ }
+
+ limiter := c.methodLimiters[info.FullMethod]
+ if limiter == nil {
+ // No concurrency limiting
+ return handler(ctx, req)
+ }
+
+ return limiter.Limit(ctx, lockKey, func() (interface{}, error) {
+ return handler(ctx, req)
+ })
+ }
+}
+
+// StreamInterceptor returns a Stream Interceptor
+func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor {
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ wrapper := &wrappedStream{stream, info, c, true}
+ return handler(srv, wrapper)
+ }
+}
+
+func createLimiterConfig(middleware *LimiterMiddleware, cfg config.Cfg) map[string]*ConcurrencyLimiter {
+ result := make(map[string]*ConcurrencyLimiter)
+ for _, limit := range cfg.Concurrency {
+ result[limit.RPC] = NewLimiter(limit.MaxPerRepo, newPromMonitor(middleware, "gitaly", limit.RPC))
+ }
+
+ // Set default for ReplicateRepository.
+ replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
+ if _, ok := result[replicateRepositoryFullMethod]; !ok {
+ result[replicateRepositoryFullMethod] = NewLimiter(1, newPromMonitor(middleware, "gitaly", replicateRepositoryFullMethod))
+ }
+
+ return result
+}
+
+type wrappedStream struct {
+ grpc.ServerStream
+ info *grpc.StreamServerInfo
+ limiterMiddleware *LimiterMiddleware
+ initial bool
+}
+
+func (w *wrappedStream) RecvMsg(m interface{}) error {
+ if err := w.ServerStream.RecvMsg(m); err != nil {
+ return err
+ }
+
+ // Only perform limiting on the first request of a stream
+ if !w.initial {
+ return nil
+ }
+
+ w.initial = false
+
+ ctx := w.Context()
+
+ lockKey := w.limiterMiddleware.getLockKey(ctx)
+ if lockKey == "" {
+ return nil
+ }
+
+ limiter := w.limiterMiddleware.methodLimiters[w.info.FullMethod]
+ if limiter == nil {
+ // No concurrency limiting
+ return nil
+ }
+
+ ready := make(chan struct{})
+ go func() {
+ if _, err := limiter.Limit(ctx, lockKey, func() (interface{}, error) {
+ close(ready)
+ <-ctx.Done()
+ return nil, nil
+ }); err != nil {
+ ctxlogrus.Extract(ctx).WithError(err).Error("rate limiting streaming request")
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ready:
+ // It's our turn!
+ return nil
+ }
+}
diff --git a/internal/middleware/limithandler/limithandler_test.go b/internal/middleware/limithandler/middleware_test.go
index 84df9f150..16926c004 100644
--- a/internal/middleware/limithandler/limithandler_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
@@ -24,10 +25,17 @@ func fixedLockKey(ctx context.Context) string {
}
func TestUnaryLimitHandler(t *testing.T) {
+ t.Parallel()
+
s := &server{blockCh: make(chan struct{})}
- limithandler.SetMaxRepoConcurrency(map[string]int{"/test.limithandler.Test/Unary": 2})
- lh := limithandler.New(fixedLockKey)
+ cfg := config.Cfg{
+ Concurrency: []config.Concurrency{
+ {RPC: "/test.limithandler.Test/Unary", MaxPerRepo: 2},
+ },
+ }
+
+ lh := limithandler.New(cfg, fixedLockKey)
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
@@ -64,6 +72,8 @@ func TestUnaryLimitHandler(t *testing.T) {
}
func TestStreamLimitHandler(t *testing.T) {
+ t.Parallel()
+
testCases := []struct {
desc string
fullname string
@@ -170,13 +180,17 @@ func TestStreamLimitHandler(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
s := &server{blockCh: make(chan struct{})}
- limithandler.SetMaxRepoConcurrency(map[string]int{
- tc.fullname: tc.maxConcurrency,
- })
+ cfg := config.Cfg{
+ Concurrency: []config.Concurrency{
+ {RPC: tc.fullname, MaxPerRepo: tc.maxConcurrency},
+ },
+ }
- lh := limithandler.New(fixedLockKey)
+ lh := limithandler.New(cfg, fixedLockKey)
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go
new file mode 100644
index 000000000..40c3869dc
--- /dev/null
+++ b/internal/middleware/limithandler/monitor.go
@@ -0,0 +1,76 @@
+package limithandler
+
+import (
+ "context"
+ "strings"
+ "time"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+const acquireDurationLogThreshold = 10 * time.Millisecond
+
+// ConcurrencyMonitor allows the concurrency monitor to be observed
+type ConcurrencyMonitor interface {
+ Queued(ctx context.Context)
+ Dequeued(ctx context.Context)
+ Enter(ctx context.Context, acquireTime time.Duration)
+ Exit(ctx context.Context)
+}
+
+type nullConcurrencyMonitor struct{}
+
+func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {}
+func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {}
+func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {}
+func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {}
+
+type promMonitor struct {
+ queuedMetric prometheus.Gauge
+ inProgressMetric prometheus.Gauge
+ acquiringSecondsMetric prometheus.Observer
+}
+
+// newPromMonitor creates a new ConcurrencyMonitor that tracks limiter
+// activity in Prometheus.
+func newPromMonitor(lh *LimiterMiddleware, system string, fullMethod string) ConcurrencyMonitor {
+ serviceName, methodName := splitMethodName(fullMethod)
+
+ return &promMonitor{
+ lh.queuedMetric.WithLabelValues(system, serviceName, methodName),
+ lh.inProgressMetric.WithLabelValues(system, serviceName, methodName),
+ lh.acquiringSecondsMetric.WithLabelValues(system, serviceName, methodName),
+ }
+}
+
+func (c *promMonitor) Queued(ctx context.Context) {
+ c.queuedMetric.Inc()
+}
+
+func (c *promMonitor) Dequeued(ctx context.Context) {
+ c.queuedMetric.Dec()
+}
+
+func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
+ c.inProgressMetric.Inc()
+
+ if acquireTime > acquireDurationLogThreshold {
+ logger := ctxlogrus.Extract(ctx)
+ logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait")
+ }
+
+ c.acquiringSecondsMetric.Observe(acquireTime.Seconds())
+}
+
+func (c *promMonitor) Exit(ctx context.Context) {
+ c.inProgressMetric.Dec()
+}
+
+func splitMethodName(fullMethodName string) (string, string) {
+ fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
+ if i := strings.Index(fullMethodName, "/"); i >= 0 {
+ return fullMethodName[:i], fullMethodName[i+1:]
+ }
+ return "unknown", "unknown"
+}
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 0e48d6622..ed1acf67d 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -30,6 +30,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitlab"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
praefectconfig "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
@@ -219,6 +220,7 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi
gsd.logger.WithField("test", t.Name()),
deps.GetBackchannelRegistry(),
deps.GetDiskCache(),
+ deps.GetLimitHandler(),
)
if cfg.InternalSocketDir != "" {
@@ -299,6 +301,7 @@ type gitalyServerDeps struct {
catfileCache catfile.Cache
diskCache cache.Cache
packObjectsCache streamcache.Cache
+ limitHandler *limithandler.LimiterMiddleware
}
func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server) *service.Dependencies {
@@ -357,6 +360,10 @@ func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, ru
t.Cleanup(gsd.packObjectsCache.Stop)
}
+ if gsd.limitHandler == nil {
+ gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
+ }
+
return &service.Dependencies{
Cfg: cfg,
RubyServer: rubyServer,
@@ -371,6 +378,7 @@ func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, ru
CatfileCache: gsd.catfileCache,
DiskCache: gsd.diskCache,
PackObjectsCache: gsd.packObjectsCache,
+ LimitHandler: gsd.limitHandler,
}
}