diff options
author | John Cai <jcai@gitlab.com> | 2022-03-25 16:13:53 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-03-30 16:56:57 +0300 |
commit | b1b109f136c945888852def1e1452233fe5ac09f (patch) | |
tree | 7e1dd950a8d2729de59e122a2127eeb790303a10 | |
parent | 30f2a02af920b6b4a5c3c948a2516da3833c76e6 (diff) |
server: Allow multiple limit handlers
Now that we are adding a second limit handle, adjust the code to allow
for multiple limit handlers to be passed into a server invocation.
-rw-r--r-- | internal/gitaly/server/server.go | 101 | ||||
-rw-r--r-- | internal/gitaly/server/server_factory.go | 22 |
2 files changed, 73 insertions, 50 deletions
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index 2304c0b8f..83e6341a0 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -59,7 +59,7 @@ func New( logrusEntry *log.Entry, registry *backchannel.Registry, cacheInvalidator diskcache.Invalidator, - limitHandler *limithandler.LimiterMiddleware, + limitHandlers ...*limithandler.LimiterMiddleware, ) (*grpc.Server, error) { ctxTagOpts := []grpcmwtags.Option{ grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), @@ -95,56 +95,67 @@ func New( ), ) + streamServerInterceptors := []grpc.StreamServerInterceptor{ + grpcmwtags.StreamServerInterceptor(ctxTagOpts...), + grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler + metadatahandler.StreamInterceptor, + grpcprometheus.StreamServerInterceptor, + commandstatshandler.StreamInterceptor, + grpcmwlogrus.StreamServerInterceptor(logrusEntry, + grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), + logMsgProducer, + gitalylog.DeciderOption(), + ), + gitalylog.StreamLogDataCatcherServerInterceptor(), + sentryhandler.StreamLogHandler, + cancelhandler.Stream, // Should be below LogHandler + auth.StreamServerInterceptor(cfg.Auth), + } + unaryServerInterceptors := []grpc.UnaryServerInterceptor{ + grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), + grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler + metadatahandler.UnaryInterceptor, + grpcprometheus.UnaryServerInterceptor, + commandstatshandler.UnaryInterceptor, + grpcmwlogrus.UnaryServerInterceptor(logrusEntry, + grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), + logMsgProducer, + gitalylog.DeciderOption(), + ), + gitalylog.UnaryLogDataCatcherServerInterceptor(), + sentryhandler.UnaryLogHandler, + cancelhandler.Unary, // Should be below LogHandler + auth.UnaryServerInterceptor(cfg.Auth), + } + // Should be below auth handler to prevent v2 hmac tokens from timing out while queued + for _, limitHandler := range limitHandlers { + streamServerInterceptors = append(streamServerInterceptors, limitHandler.StreamInterceptor()) + unaryServerInterceptors = append(unaryServerInterceptors, limitHandler.UnaryInterceptor()) + } + + streamServerInterceptors = append(streamServerInterceptors, + grpctracing.StreamServerTracingInterceptor(), + cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.StreamPanicHandler, + ) + + unaryServerInterceptors = append(unaryServerInterceptors, + cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.UnaryPanicHandler, + ) + opts := []grpc.ServerOption{ grpc.StatsHandler(gitalylog.PerRPCLogHandler{ Underlying: &grpcstats.PayloadBytes{}, FieldProducers: []gitalylog.FieldsProducer{grpcstats.FieldsProducer}, }), grpc.Creds(lm), - grpc.StreamInterceptor(grpcmw.ChainStreamServer( - grpcmwtags.StreamServerInterceptor(ctxTagOpts...), - grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.StreamInterceptor, - grpcprometheus.StreamServerInterceptor, - commandstatshandler.StreamInterceptor, - grpcmwlogrus.StreamServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - logMsgProducer, - gitalylog.DeciderOption(), - ), - gitalylog.StreamLogDataCatcherServerInterceptor(), - sentryhandler.StreamLogHandler, - cancelhandler.Stream, // Should be below LogHandler - auth.StreamServerInterceptor(cfg.Auth), - limitHandler.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued - grpctracing.StreamServerTracingInterceptor(), - cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.StreamPanicHandler, - )), - grpc.UnaryInterceptor(grpcmw.ChainUnaryServer( - grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), - grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.UnaryInterceptor, - grpcprometheus.UnaryServerInterceptor, - commandstatshandler.UnaryInterceptor, - grpcmwlogrus.UnaryServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - logMsgProducer, - gitalylog.DeciderOption(), - ), - gitalylog.UnaryLogDataCatcherServerInterceptor(), - sentryhandler.UnaryLogHandler, - cancelhandler.Unary, // Should be below LogHandler - auth.UnaryServerInterceptor(cfg.Auth), - limitHandler.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued - grpctracing.UnaryServerTracingInterceptor(), - cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.UnaryPanicHandler, - )), + grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamServerInterceptors...)), + grpc.UnaryInterceptor(grpcmw.ChainUnaryServer(unaryServerInterceptors...)), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 20 * time.Second, PermitWithoutStream: true, diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index 279ad9f70..92d385b61 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -24,7 +24,7 @@ import ( type GitalyServerFactory struct { registry *backchannel.Registry cacheInvalidator cache.Invalidator - limitHandler *limithandler.LimiterMiddleware + limitHandlers []*limithandler.LimiterMiddleware cfg config.Cfg logger *logrus.Entry externalServers []*grpc.Server @@ -38,14 +38,14 @@ func NewGitalyServerFactory( logger *logrus.Entry, registry *backchannel.Registry, cacheInvalidator cache.Invalidator, - limitHandler *limithandler.LimiterMiddleware, + limitHandlers ...*limithandler.LimiterMiddleware, ) *GitalyServerFactory { return &GitalyServerFactory{ cfg: cfg, logger: logger, registry: registry, cacheInvalidator: cacheInvalidator, - limitHandler: limitHandler, + limitHandlers: limitHandlers, } } @@ -140,7 +140,13 @@ func (s *GitalyServerFactory) GracefulStop() { // CreateExternal creates a new external gRPC server. The external servers are closed // before the internal servers when gracefully shutting down. func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) { - server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler) + server, err := New( + secure, + s.cfg, + s.logger, + s.registry, + s.cacheInvalidator, + s.limitHandlers...) if err != nil { return nil, err } @@ -152,7 +158,13 @@ func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) // CreateInternal creates a new internal gRPC server. Internal servers are closed // after the external ones when gracefully shutting down. func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, error) { - server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler) + server, err := New( + false, + s.cfg, + s.logger, + s.registry, + s.cacheInvalidator, + s.limitHandlers...) if err != nil { return nil, err } |