diff options
author | John Cai <jcai@gitlab.com> | 2022-03-25 16:13:53 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-04-06 22:27:35 +0300 |
commit | f4cb81c2dcdfaaa67a2752f7451917f6080db652 (patch) | |
tree | f3ede5ddc01a33abd1fec48b70cd95321b5b353f | |
parent | 7ce0a1bb170eb89da15a12acf9446f2cb43262a1 (diff) |
server: Allow multiple limit handlers
Now that we are adding a second limit handle, adjust the code to allow
for multiple limit handlers to be passed into a server invocation.
-rw-r--r-- | cmd/gitaly-ssh/auth_test.go | 2 | ||||
-rw-r--r-- | cmd/gitaly/main.go | 2 | ||||
-rw-r--r-- | internal/gitaly/server/auth_test.go | 4 | ||||
-rw-r--r-- | internal/gitaly/server/server.go | 101 | ||||
-rw-r--r-- | internal/gitaly/server/server_factory.go | 23 | ||||
-rw-r--r-- | internal/gitaly/server/server_factory_test.go | 10 | ||||
-rw-r--r-- | internal/gitaly/service/repository/create_fork_test.go | 2 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 2 |
8 files changed, 85 insertions, 61 deletions
diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go index 5b8cb9ee8..070006ce5 100644 --- a/cmd/gitaly-ssh/auth_test.go +++ b/cmd/gitaly-ssh/auth_test.go @@ -152,7 +152,7 @@ func runServer(t *testing.T, secure bool, cfg config.Cfg, connectionType string, )) limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) diskCache := cache.New(cfg, locator) - srv, err := server.New(secure, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler) + srv, err := server.New(secure, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}) require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ Cfg: cfg, diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index bba3d4ca8..818f5ce2f 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -216,7 +216,7 @@ func run(cfg config.Cfg) error { limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) prometheus.MustRegister(limitHandler) - gitalyServerFactory := server.NewGitalyServerFactory(cfg, glog.Default(), registry, diskCache, limitHandler) + gitalyServerFactory := server.NewGitalyServerFactory(cfg, glog.Default(), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}) defer gitalyServerFactory.Stop() ling, err := linguist.New(cfg, gitCmdFactory) diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index b54fca091..78107cbe7 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -203,7 +203,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache) - srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler) + srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}) require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ @@ -244,7 +244,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters), + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, ) require.NoError(t, err) diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index 2304c0b8f..ab00acd6e 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -59,7 +59,7 @@ func New( logrusEntry *log.Entry, registry *backchannel.Registry, cacheInvalidator diskcache.Invalidator, - limitHandler *limithandler.LimiterMiddleware, + limitHandlers []*limithandler.LimiterMiddleware, ) (*grpc.Server, error) { ctxTagOpts := []grpcmwtags.Option{ grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), @@ -95,56 +95,67 @@ func New( ), ) + streamServerInterceptors := []grpc.StreamServerInterceptor{ + grpcmwtags.StreamServerInterceptor(ctxTagOpts...), + grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler + metadatahandler.StreamInterceptor, + grpcprometheus.StreamServerInterceptor, + commandstatshandler.StreamInterceptor, + grpcmwlogrus.StreamServerInterceptor(logrusEntry, + grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), + logMsgProducer, + gitalylog.DeciderOption(), + ), + gitalylog.StreamLogDataCatcherServerInterceptor(), + sentryhandler.StreamLogHandler, + cancelhandler.Stream, // Should be below LogHandler + auth.StreamServerInterceptor(cfg.Auth), + } + unaryServerInterceptors := []grpc.UnaryServerInterceptor{ + grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), + grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler + metadatahandler.UnaryInterceptor, + grpcprometheus.UnaryServerInterceptor, + commandstatshandler.UnaryInterceptor, + grpcmwlogrus.UnaryServerInterceptor(logrusEntry, + grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), + logMsgProducer, + gitalylog.DeciderOption(), + ), + gitalylog.UnaryLogDataCatcherServerInterceptor(), + sentryhandler.UnaryLogHandler, + cancelhandler.Unary, // Should be below LogHandler + auth.UnaryServerInterceptor(cfg.Auth), + } + // Should be below auth handler to prevent v2 hmac tokens from timing out while queued + for _, limitHandler := range limitHandlers { + streamServerInterceptors = append(streamServerInterceptors, limitHandler.StreamInterceptor()) + unaryServerInterceptors = append(unaryServerInterceptors, limitHandler.UnaryInterceptor()) + } + + streamServerInterceptors = append(streamServerInterceptors, + grpctracing.StreamServerTracingInterceptor(), + cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.StreamPanicHandler, + ) + + unaryServerInterceptors = append(unaryServerInterceptors, + cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.UnaryPanicHandler, + ) + opts := []grpc.ServerOption{ grpc.StatsHandler(gitalylog.PerRPCLogHandler{ Underlying: &grpcstats.PayloadBytes{}, FieldProducers: []gitalylog.FieldsProducer{grpcstats.FieldsProducer}, }), grpc.Creds(lm), - grpc.StreamInterceptor(grpcmw.ChainStreamServer( - grpcmwtags.StreamServerInterceptor(ctxTagOpts...), - grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.StreamInterceptor, - grpcprometheus.StreamServerInterceptor, - commandstatshandler.StreamInterceptor, - grpcmwlogrus.StreamServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - logMsgProducer, - gitalylog.DeciderOption(), - ), - gitalylog.StreamLogDataCatcherServerInterceptor(), - sentryhandler.StreamLogHandler, - cancelhandler.Stream, // Should be below LogHandler - auth.StreamServerInterceptor(cfg.Auth), - limitHandler.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued - grpctracing.StreamServerTracingInterceptor(), - cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.StreamPanicHandler, - )), - grpc.UnaryInterceptor(grpcmw.ChainUnaryServer( - grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), - grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.UnaryInterceptor, - grpcprometheus.UnaryServerInterceptor, - commandstatshandler.UnaryInterceptor, - grpcmwlogrus.UnaryServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - logMsgProducer, - gitalylog.DeciderOption(), - ), - gitalylog.UnaryLogDataCatcherServerInterceptor(), - sentryhandler.UnaryLogHandler, - cancelhandler.Unary, // Should be below LogHandler - auth.UnaryServerInterceptor(cfg.Auth), - limitHandler.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued - grpctracing.UnaryServerTracingInterceptor(), - cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.UnaryPanicHandler, - )), + grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamServerInterceptors...)), + grpc.UnaryInterceptor(grpcmw.ChainUnaryServer(unaryServerInterceptors...)), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 20 * time.Second, PermitWithoutStream: true, diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index 279ad9f70..b561045ff 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -24,7 +24,7 @@ import ( type GitalyServerFactory struct { registry *backchannel.Registry cacheInvalidator cache.Invalidator - limitHandler *limithandler.LimiterMiddleware + limitHandlers []*limithandler.LimiterMiddleware cfg config.Cfg logger *logrus.Entry externalServers []*grpc.Server @@ -38,14 +38,14 @@ func NewGitalyServerFactory( logger *logrus.Entry, registry *backchannel.Registry, cacheInvalidator cache.Invalidator, - limitHandler *limithandler.LimiterMiddleware, + limitHandlers []*limithandler.LimiterMiddleware, ) *GitalyServerFactory { return &GitalyServerFactory{ cfg: cfg, logger: logger, registry: registry, cacheInvalidator: cacheInvalidator, - limitHandler: limitHandler, + limitHandlers: limitHandlers, } } @@ -140,7 +140,14 @@ func (s *GitalyServerFactory) GracefulStop() { // CreateExternal creates a new external gRPC server. The external servers are closed // before the internal servers when gracefully shutting down. func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) { - server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler) + server, err := New( + secure, + s.cfg, + s.logger, + s.registry, + s.cacheInvalidator, + s.limitHandlers, + ) if err != nil { return nil, err } @@ -152,7 +159,13 @@ func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) // CreateInternal creates a new internal gRPC server. Internal servers are closed // after the external ones when gracefully shutting down. func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, error) { - server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler) + server, err := New( + false, + s.cfg, + s.logger, + s.registry, + s.cacheInvalidator, + s.limitHandlers) if err != nil { return nil, err } diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index 4d560250c..a28827248 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -93,7 +93,7 @@ func TestGitalyServerFactory(t *testing.T) { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters), + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, ) checkHealth(t, sf, starter.TCP, "localhost:0") @@ -112,7 +112,7 @@ func TestGitalyServerFactory(t *testing.T) { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters), + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, ) t.Cleanup(sf.Stop) @@ -126,7 +126,7 @@ func TestGitalyServerFactory(t *testing.T) { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters), + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, ) t.Cleanup(sf.Stop) @@ -156,7 +156,7 @@ func TestGitalyServerFactory(t *testing.T) { logger.WithContext(ctx), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters), + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, ) checkHealth(t, sf, starter.TCP, "localhost:0") @@ -190,7 +190,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters), + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, ) defer sf.Stop() diff --git a/internal/gitaly/service/repository/create_fork_test.go b/internal/gitaly/service/repository/create_fork_test.go index b3d0ceaa3..22aa1c0da 100644 --- a/internal/gitaly/service/repository/create_fork_test.go +++ b/internal/gitaly/service/repository/create_fork_test.go @@ -260,7 +260,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s locator := config.NewLocator(cfg) cache := cache.New(cfg, locator) limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) - server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache, limitHandler) + server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache, []*limithandler.LimiterMiddleware{limitHandler}) require.NoError(t, err) listener, addr := testhelper.GetLocalhostListener(t) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index d2f212045..753a579e3 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -158,7 +158,7 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi gsd.logger.WithField("test", t.Name()), deps.GetBackchannelRegistry(), deps.GetDiskCache(), - deps.GetLimitHandler(), + []*limithandler.LimiterMiddleware{deps.GetLimitHandler()}, ) if cfg.InternalSocketDir != "" { |