Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-03-25 16:13:53 +0300
committerJohn Cai <jcai@gitlab.com>2022-04-06 22:27:35 +0300
commitf4cb81c2dcdfaaa67a2752f7451917f6080db652 (patch)
treef3ede5ddc01a33abd1fec48b70cd95321b5b353f
parent7ce0a1bb170eb89da15a12acf9446f2cb43262a1 (diff)
server: Allow multiple limit handlers
Now that we are adding a second limit handle, adjust the code to allow for multiple limit handlers to be passed into a server invocation.
-rw-r--r--cmd/gitaly-ssh/auth_test.go2
-rw-r--r--cmd/gitaly/main.go2
-rw-r--r--internal/gitaly/server/auth_test.go4
-rw-r--r--internal/gitaly/server/server.go101
-rw-r--r--internal/gitaly/server/server_factory.go23
-rw-r--r--internal/gitaly/server/server_factory_test.go10
-rw-r--r--internal/gitaly/service/repository/create_fork_test.go2
-rw-r--r--internal/testhelper/testserver/gitaly.go2
8 files changed, 85 insertions, 61 deletions
diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go
index 5b8cb9ee8..070006ce5 100644
--- a/cmd/gitaly-ssh/auth_test.go
+++ b/cmd/gitaly-ssh/auth_test.go
@@ -152,7 +152,7 @@ func runServer(t *testing.T, secure bool, cfg config.Cfg, connectionType string,
))
limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
diskCache := cache.New(cfg, locator)
- srv, err := server.New(secure, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler)
+ srv, err := server.New(secure, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler})
require.NoError(t, err)
setup.RegisterAll(srv, &service.Dependencies{
Cfg: cfg,
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index bba3d4ca8..818f5ce2f 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -216,7 +216,7 @@ func run(cfg config.Cfg) error {
limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
prometheus.MustRegister(limitHandler)
- gitalyServerFactory := server.NewGitalyServerFactory(cfg, glog.Default(), registry, diskCache, limitHandler)
+ gitalyServerFactory := server.NewGitalyServerFactory(cfg, glog.Default(), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler})
defer gitalyServerFactory.Stop()
ling, err := linguist.New(cfg, gitCmdFactory)
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index b54fca091..78107cbe7 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -203,7 +203,7 @@ func runServer(t *testing.T, cfg config.Cfg) string {
limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache)
- srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler)
+ srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler})
require.NoError(t, err)
setup.RegisterAll(srv, &service.Dependencies{
@@ -244,7 +244,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
)
require.NoError(t, err)
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go
index 2304c0b8f..ab00acd6e 100644
--- a/internal/gitaly/server/server.go
+++ b/internal/gitaly/server/server.go
@@ -59,7 +59,7 @@ func New(
logrusEntry *log.Entry,
registry *backchannel.Registry,
cacheInvalidator diskcache.Invalidator,
- limitHandler *limithandler.LimiterMiddleware,
+ limitHandlers []*limithandler.LimiterMiddleware,
) (*grpc.Server, error) {
ctxTagOpts := []grpcmwtags.Option{
grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
@@ -95,56 +95,67 @@ func New(
),
)
+ streamServerInterceptors := []grpc.StreamServerInterceptor{
+ grpcmwtags.StreamServerInterceptor(ctxTagOpts...),
+ grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
+ metadatahandler.StreamInterceptor,
+ grpcprometheus.StreamServerInterceptor,
+ commandstatshandler.StreamInterceptor,
+ grpcmwlogrus.StreamServerInterceptor(logrusEntry,
+ grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
+ logMsgProducer,
+ gitalylog.DeciderOption(),
+ ),
+ gitalylog.StreamLogDataCatcherServerInterceptor(),
+ sentryhandler.StreamLogHandler,
+ cancelhandler.Stream, // Should be below LogHandler
+ auth.StreamServerInterceptor(cfg.Auth),
+ }
+ unaryServerInterceptors := []grpc.UnaryServerInterceptor{
+ grpcmwtags.UnaryServerInterceptor(ctxTagOpts...),
+ grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
+ metadatahandler.UnaryInterceptor,
+ grpcprometheus.UnaryServerInterceptor,
+ commandstatshandler.UnaryInterceptor,
+ grpcmwlogrus.UnaryServerInterceptor(logrusEntry,
+ grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
+ logMsgProducer,
+ gitalylog.DeciderOption(),
+ ),
+ gitalylog.UnaryLogDataCatcherServerInterceptor(),
+ sentryhandler.UnaryLogHandler,
+ cancelhandler.Unary, // Should be below LogHandler
+ auth.UnaryServerInterceptor(cfg.Auth),
+ }
+ // Should be below auth handler to prevent v2 hmac tokens from timing out while queued
+ for _, limitHandler := range limitHandlers {
+ streamServerInterceptors = append(streamServerInterceptors, limitHandler.StreamInterceptor())
+ unaryServerInterceptors = append(unaryServerInterceptors, limitHandler.UnaryInterceptor())
+ }
+
+ streamServerInterceptors = append(streamServerInterceptors,
+ grpctracing.StreamServerTracingInterceptor(),
+ cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered),
+ // Panic handler should remain last so that application panics will be
+ // converted to errors and logged
+ panichandler.StreamPanicHandler,
+ )
+
+ unaryServerInterceptors = append(unaryServerInterceptors,
+ cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered),
+ // Panic handler should remain last so that application panics will be
+ // converted to errors and logged
+ panichandler.UnaryPanicHandler,
+ )
+
opts := []grpc.ServerOption{
grpc.StatsHandler(gitalylog.PerRPCLogHandler{
Underlying: &grpcstats.PayloadBytes{},
FieldProducers: []gitalylog.FieldsProducer{grpcstats.FieldsProducer},
}),
grpc.Creds(lm),
- grpc.StreamInterceptor(grpcmw.ChainStreamServer(
- grpcmwtags.StreamServerInterceptor(ctxTagOpts...),
- grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
- metadatahandler.StreamInterceptor,
- grpcprometheus.StreamServerInterceptor,
- commandstatshandler.StreamInterceptor,
- grpcmwlogrus.StreamServerInterceptor(logrusEntry,
- grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
- logMsgProducer,
- gitalylog.DeciderOption(),
- ),
- gitalylog.StreamLogDataCatcherServerInterceptor(),
- sentryhandler.StreamLogHandler,
- cancelhandler.Stream, // Should be below LogHandler
- auth.StreamServerInterceptor(cfg.Auth),
- limitHandler.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued
- grpctracing.StreamServerTracingInterceptor(),
- cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered),
- // Panic handler should remain last so that application panics will be
- // converted to errors and logged
- panichandler.StreamPanicHandler,
- )),
- grpc.UnaryInterceptor(grpcmw.ChainUnaryServer(
- grpcmwtags.UnaryServerInterceptor(ctxTagOpts...),
- grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
- metadatahandler.UnaryInterceptor,
- grpcprometheus.UnaryServerInterceptor,
- commandstatshandler.UnaryInterceptor,
- grpcmwlogrus.UnaryServerInterceptor(logrusEntry,
- grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
- logMsgProducer,
- gitalylog.DeciderOption(),
- ),
- gitalylog.UnaryLogDataCatcherServerInterceptor(),
- sentryhandler.UnaryLogHandler,
- cancelhandler.Unary, // Should be below LogHandler
- auth.UnaryServerInterceptor(cfg.Auth),
- limitHandler.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued
- grpctracing.UnaryServerTracingInterceptor(),
- cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered),
- // Panic handler should remain last so that application panics will be
- // converted to errors and logged
- panichandler.UnaryPanicHandler,
- )),
+ grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamServerInterceptors...)),
+ grpc.UnaryInterceptor(grpcmw.ChainUnaryServer(unaryServerInterceptors...)),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 20 * time.Second,
PermitWithoutStream: true,
diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go
index 279ad9f70..b561045ff 100644
--- a/internal/gitaly/server/server_factory.go
+++ b/internal/gitaly/server/server_factory.go
@@ -24,7 +24,7 @@ import (
type GitalyServerFactory struct {
registry *backchannel.Registry
cacheInvalidator cache.Invalidator
- limitHandler *limithandler.LimiterMiddleware
+ limitHandlers []*limithandler.LimiterMiddleware
cfg config.Cfg
logger *logrus.Entry
externalServers []*grpc.Server
@@ -38,14 +38,14 @@ func NewGitalyServerFactory(
logger *logrus.Entry,
registry *backchannel.Registry,
cacheInvalidator cache.Invalidator,
- limitHandler *limithandler.LimiterMiddleware,
+ limitHandlers []*limithandler.LimiterMiddleware,
) *GitalyServerFactory {
return &GitalyServerFactory{
cfg: cfg,
logger: logger,
registry: registry,
cacheInvalidator: cacheInvalidator,
- limitHandler: limitHandler,
+ limitHandlers: limitHandlers,
}
}
@@ -140,7 +140,14 @@ func (s *GitalyServerFactory) GracefulStop() {
// CreateExternal creates a new external gRPC server. The external servers are closed
// before the internal servers when gracefully shutting down.
func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) {
- server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler)
+ server, err := New(
+ secure,
+ s.cfg,
+ s.logger,
+ s.registry,
+ s.cacheInvalidator,
+ s.limitHandlers,
+ )
if err != nil {
return nil, err
}
@@ -152,7 +159,13 @@ func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error)
// CreateInternal creates a new internal gRPC server. Internal servers are closed
// after the external ones when gracefully shutting down.
func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, error) {
- server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler)
+ server, err := New(
+ false,
+ s.cfg,
+ s.logger,
+ s.registry,
+ s.cacheInvalidator,
+ s.limitHandlers)
if err != nil {
return nil, err
}
diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go
index 4d560250c..a28827248 100644
--- a/internal/gitaly/server/server_factory_test.go
+++ b/internal/gitaly/server/server_factory_test.go
@@ -93,7 +93,7 @@ func TestGitalyServerFactory(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
)
checkHealth(t, sf, starter.TCP, "localhost:0")
@@ -112,7 +112,7 @@ func TestGitalyServerFactory(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
)
t.Cleanup(sf.Stop)
@@ -126,7 +126,7 @@ func TestGitalyServerFactory(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
)
t.Cleanup(sf.Stop)
@@ -156,7 +156,7 @@ func TestGitalyServerFactory(t *testing.T) {
logger.WithContext(ctx),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
)
checkHealth(t, sf, starter.TCP, "localhost:0")
@@ -190,7 +190,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
)
defer sf.Stop()
diff --git a/internal/gitaly/service/repository/create_fork_test.go b/internal/gitaly/service/repository/create_fork_test.go
index b3d0ceaa3..22aa1c0da 100644
--- a/internal/gitaly/service/repository/create_fork_test.go
+++ b/internal/gitaly/service/repository/create_fork_test.go
@@ -260,7 +260,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s
locator := config.NewLocator(cfg)
cache := cache.New(cfg, locator)
limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
- server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache, limitHandler)
+ server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache, []*limithandler.LimiterMiddleware{limitHandler})
require.NoError(t, err)
listener, addr := testhelper.GetLocalhostListener(t)
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index d2f212045..753a579e3 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -158,7 +158,7 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi
gsd.logger.WithField("test", t.Name()),
deps.GetBackchannelRegistry(),
deps.GetDiskCache(),
- deps.GetLimitHandler(),
+ []*limithandler.LimiterMiddleware{deps.GetLimitHandler()},
)
if cfg.InternalSocketDir != "" {