Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-03-25 16:13:53 +0300
committerJohn Cai <jcai@gitlab.com>2022-03-30 16:56:57 +0300
commitb1b109f136c945888852def1e1452233fe5ac09f (patch)
tree7e1dd950a8d2729de59e122a2127eeb790303a10
parent30f2a02af920b6b4a5c3c948a2516da3833c76e6 (diff)
server: Allow multiple limit handlers
Now that we are adding a second limit handle, adjust the code to allow for multiple limit handlers to be passed into a server invocation.
-rw-r--r--internal/gitaly/server/server.go101
-rw-r--r--internal/gitaly/server/server_factory.go22
2 files changed, 73 insertions, 50 deletions
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go
index 2304c0b8f..83e6341a0 100644
--- a/internal/gitaly/server/server.go
+++ b/internal/gitaly/server/server.go
@@ -59,7 +59,7 @@ func New(
logrusEntry *log.Entry,
registry *backchannel.Registry,
cacheInvalidator diskcache.Invalidator,
- limitHandler *limithandler.LimiterMiddleware,
+ limitHandlers ...*limithandler.LimiterMiddleware,
) (*grpc.Server, error) {
ctxTagOpts := []grpcmwtags.Option{
grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
@@ -95,56 +95,67 @@ func New(
),
)
+ streamServerInterceptors := []grpc.StreamServerInterceptor{
+ grpcmwtags.StreamServerInterceptor(ctxTagOpts...),
+ grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
+ metadatahandler.StreamInterceptor,
+ grpcprometheus.StreamServerInterceptor,
+ commandstatshandler.StreamInterceptor,
+ grpcmwlogrus.StreamServerInterceptor(logrusEntry,
+ grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
+ logMsgProducer,
+ gitalylog.DeciderOption(),
+ ),
+ gitalylog.StreamLogDataCatcherServerInterceptor(),
+ sentryhandler.StreamLogHandler,
+ cancelhandler.Stream, // Should be below LogHandler
+ auth.StreamServerInterceptor(cfg.Auth),
+ }
+ unaryServerInterceptors := []grpc.UnaryServerInterceptor{
+ grpcmwtags.UnaryServerInterceptor(ctxTagOpts...),
+ grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
+ metadatahandler.UnaryInterceptor,
+ grpcprometheus.UnaryServerInterceptor,
+ commandstatshandler.UnaryInterceptor,
+ grpcmwlogrus.UnaryServerInterceptor(logrusEntry,
+ grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
+ logMsgProducer,
+ gitalylog.DeciderOption(),
+ ),
+ gitalylog.UnaryLogDataCatcherServerInterceptor(),
+ sentryhandler.UnaryLogHandler,
+ cancelhandler.Unary, // Should be below LogHandler
+ auth.UnaryServerInterceptor(cfg.Auth),
+ }
+ // Should be below auth handler to prevent v2 hmac tokens from timing out while queued
+ for _, limitHandler := range limitHandlers {
+ streamServerInterceptors = append(streamServerInterceptors, limitHandler.StreamInterceptor())
+ unaryServerInterceptors = append(unaryServerInterceptors, limitHandler.UnaryInterceptor())
+ }
+
+ streamServerInterceptors = append(streamServerInterceptors,
+ grpctracing.StreamServerTracingInterceptor(),
+ cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered),
+ // Panic handler should remain last so that application panics will be
+ // converted to errors and logged
+ panichandler.StreamPanicHandler,
+ )
+
+ unaryServerInterceptors = append(unaryServerInterceptors,
+ cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered),
+ // Panic handler should remain last so that application panics will be
+ // converted to errors and logged
+ panichandler.UnaryPanicHandler,
+ )
+
opts := []grpc.ServerOption{
grpc.StatsHandler(gitalylog.PerRPCLogHandler{
Underlying: &grpcstats.PayloadBytes{},
FieldProducers: []gitalylog.FieldsProducer{grpcstats.FieldsProducer},
}),
grpc.Creds(lm),
- grpc.StreamInterceptor(grpcmw.ChainStreamServer(
- grpcmwtags.StreamServerInterceptor(ctxTagOpts...),
- grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
- metadatahandler.StreamInterceptor,
- grpcprometheus.StreamServerInterceptor,
- commandstatshandler.StreamInterceptor,
- grpcmwlogrus.StreamServerInterceptor(logrusEntry,
- grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
- logMsgProducer,
- gitalylog.DeciderOption(),
- ),
- gitalylog.StreamLogDataCatcherServerInterceptor(),
- sentryhandler.StreamLogHandler,
- cancelhandler.Stream, // Should be below LogHandler
- auth.StreamServerInterceptor(cfg.Auth),
- limitHandler.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued
- grpctracing.StreamServerTracingInterceptor(),
- cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered),
- // Panic handler should remain last so that application panics will be
- // converted to errors and logged
- panichandler.StreamPanicHandler,
- )),
- grpc.UnaryInterceptor(grpcmw.ChainUnaryServer(
- grpcmwtags.UnaryServerInterceptor(ctxTagOpts...),
- grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
- metadatahandler.UnaryInterceptor,
- grpcprometheus.UnaryServerInterceptor,
- commandstatshandler.UnaryInterceptor,
- grpcmwlogrus.UnaryServerInterceptor(logrusEntry,
- grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
- logMsgProducer,
- gitalylog.DeciderOption(),
- ),
- gitalylog.UnaryLogDataCatcherServerInterceptor(),
- sentryhandler.UnaryLogHandler,
- cancelhandler.Unary, // Should be below LogHandler
- auth.UnaryServerInterceptor(cfg.Auth),
- limitHandler.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued
- grpctracing.UnaryServerTracingInterceptor(),
- cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered),
- // Panic handler should remain last so that application panics will be
- // converted to errors and logged
- panichandler.UnaryPanicHandler,
- )),
+ grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamServerInterceptors...)),
+ grpc.UnaryInterceptor(grpcmw.ChainUnaryServer(unaryServerInterceptors...)),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 20 * time.Second,
PermitWithoutStream: true,
diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go
index 279ad9f70..92d385b61 100644
--- a/internal/gitaly/server/server_factory.go
+++ b/internal/gitaly/server/server_factory.go
@@ -24,7 +24,7 @@ import (
type GitalyServerFactory struct {
registry *backchannel.Registry
cacheInvalidator cache.Invalidator
- limitHandler *limithandler.LimiterMiddleware
+ limitHandlers []*limithandler.LimiterMiddleware
cfg config.Cfg
logger *logrus.Entry
externalServers []*grpc.Server
@@ -38,14 +38,14 @@ func NewGitalyServerFactory(
logger *logrus.Entry,
registry *backchannel.Registry,
cacheInvalidator cache.Invalidator,
- limitHandler *limithandler.LimiterMiddleware,
+ limitHandlers ...*limithandler.LimiterMiddleware,
) *GitalyServerFactory {
return &GitalyServerFactory{
cfg: cfg,
logger: logger,
registry: registry,
cacheInvalidator: cacheInvalidator,
- limitHandler: limitHandler,
+ limitHandlers: limitHandlers,
}
}
@@ -140,7 +140,13 @@ func (s *GitalyServerFactory) GracefulStop() {
// CreateExternal creates a new external gRPC server. The external servers are closed
// before the internal servers when gracefully shutting down.
func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) {
- server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler)
+ server, err := New(
+ secure,
+ s.cfg,
+ s.logger,
+ s.registry,
+ s.cacheInvalidator,
+ s.limitHandlers...)
if err != nil {
return nil, err
}
@@ -152,7 +158,13 @@ func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error)
// CreateInternal creates a new internal gRPC server. Internal servers are closed
// after the external ones when gracefully shutting down.
func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, error) {
- server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator, s.limitHandler)
+ server, err := New(
+ false,
+ s.cfg,
+ s.logger,
+ s.registry,
+ s.cacheInvalidator,
+ s.limitHandlers...)
if err != nil {
return nil, err
}