Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-03-30 16:35:36 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-04-13 10:32:59 +0300
commit4d806ab4d53fc6135bcfce437dee679f9ded6020 (patch)
tree6d4bc27d4cc216afec33dc497f2c2702032a0cac /internal/praefect/server.go
parent15fca2225b6b033510dd4d3a4ebee6dac0685061 (diff)
Serve RefTransactionService on the backchannel connection
This commit implements Praefect's backchannel server factory. The server factory is a function that returns a new server that should serve on the backchannel connection Gitaly establishes to Praefect. Gitaly only needs to contact Praefect's RefTransactionService, so it is the only registered service. The interceptor stack in the backchannel server matches mostly what Praefect generally has giving us observability into the backchannel server. There few omissions, which are not required by the backchannel server: 1. Auth interceptor is not added. It's not necessary as only the Gitalys Praefect dials can connect to the backchannel server. 2. MethodTypeUnaryInterceptor is not needed as it's only counting the types of proxied RPCs. 3. Keep alives are not configured as Praefect and Gitaly are already sending keep alives on the Praefect to Gitaly stream. Those keep alives are sufficient to keep the underlying network connection open.
Diffstat (limited to 'internal/praefect/server.go')
-rw-r--r--internal/praefect/server.go61
1 files changed, 42 insertions, 19 deletions
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 8f98c0f9f..52fc43f67 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -10,6 +10,7 @@ import (
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/server/auth"
"gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
"gitlab.com/gitlab-org/gitaly/internal/log"
@@ -37,6 +38,41 @@ import (
"google.golang.org/grpc/keepalive"
)
+// NewBackchannelServerFactory returns a ServerFactory that serves the RefTransactionServer on the backchannel
+// connection.
+func NewBackchannelServerFactory(logger *logrus.Entry, svc gitalypb.RefTransactionServer) backchannel.ServerFactory {
+ return func() backchannel.Server {
+ srv := grpc.NewServer(
+ grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
+ commonUnaryServerInterceptors(logger)...,
+ )),
+ )
+ gitalypb.RegisterRefTransactionServer(srv, svc)
+ grpc_prometheus.Register(srv)
+ return srv
+ }
+}
+
+func commonUnaryServerInterceptors(logger *logrus.Entry) []grpc.UnaryServerInterceptor {
+ return []grpc.UnaryServerInterceptor{
+ grpc_ctxtags.UnaryServerInterceptor(ctxtagsInterceptorOption()),
+ grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
+ metadatahandler.UnaryInterceptor,
+ grpc_prometheus.UnaryServerInterceptor,
+ grpc_logrus.UnaryServerInterceptor(logger, grpc_logrus.WithTimestampFormat(log.LogTimestampFormat)),
+ sentryhandler.UnaryLogHandler,
+ cancelhandler.Unary, // Should be below LogHandler
+ grpctracing.UnaryServerTracingInterceptor(),
+ // Panic handler should remain last so that application panics will be
+ // converted to errors and logged
+ panichandler.UnaryPanicHandler,
+ }
+}
+
+func ctxtagsInterceptorOption() grpc_ctxtags.Option {
+ return grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)
+}
+
// NewGRPCServer returns gRPC server with registered proxy-handler and actual services praefect serves on its own.
// It includes a set of unary and stream interceptors required to add logging, authentication, etc.
func NewGRPCServer(
@@ -53,12 +89,8 @@ func NewGRPCServer(
primaryGetter PrimaryGetter,
grpcOpts ...grpc.ServerOption,
) *grpc.Server {
- ctxTagOpts := []grpc_ctxtags.Option{
- grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
- }
-
streamInterceptors := []grpc.StreamServerInterceptor{
- grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
+ grpc_ctxtags.StreamServerInterceptor(ctxtagsInterceptorOption()),
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
middleware.MethodTypeStreamInterceptor(registry),
metadatahandler.StreamInterceptor,
@@ -82,20 +114,11 @@ func NewGRPCServer(
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
- grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...),
- grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
- middleware.MethodTypeUnaryInterceptor(registry),
- metadatahandler.UnaryInterceptor,
- grpc_prometheus.UnaryServerInterceptor,
- grpc_logrus.UnaryServerInterceptor(logger,
- grpc_logrus.WithTimestampFormat(log.LogTimestampFormat)),
- sentryhandler.UnaryLogHandler,
- cancelhandler.Unary, // Should be below LogHandler
- grpctracing.UnaryServerTracingInterceptor(),
- auth.UnaryServerInterceptor(conf.Auth),
- // Panic handler should remain last so that application panics will be
- // converted to errors and logged
- panichandler.UnaryPanicHandler,
+ append(
+ commonUnaryServerInterceptors(logger),
+ middleware.MethodTypeUnaryInterceptor(registry),
+ auth.UnaryServerInterceptor(conf.Auth),
+ )...,
)),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 20 * time.Second,