Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-06-26 14:16:27 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-06-26 14:16:27 +0300
commit389ae3336772c9bbf7cd21f2f6aed262e8ae0357 (patch)
treeebcbff78afafb87cc341784c9839388308f68565 /internal
parente57acd088cf151b6cecc70b5c7812955b3f49c93 (diff)
Praefect: server factory introduction
Introduces server factory for creating gRPC servers. Praefect gRPC server created by separate function and can be reused in tests to check routing. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/1698
Diffstat (limited to 'internal')
-rw-r--r--internal/bootstrap/starter/starter.go7
-rw-r--r--internal/bootstrap/starter/starter_test.go5
-rw-r--r--internal/praefect/auth_test.go10
-rw-r--r--internal/praefect/config/config_test.go6
-rw-r--r--internal/praefect/helper_test.go22
-rw-r--r--internal/praefect/replicator_test.go11
-rw-r--r--internal/praefect/server.go123
-rw-r--r--internal/praefect/server_factory.go117
-rw-r--r--internal/praefect/server_factory_test.go174
9 files changed, 362 insertions, 113 deletions
diff --git a/internal/bootstrap/starter/starter.go b/internal/bootstrap/starter/starter.go
index 7c43781af..a8b7c018c 100644
--- a/internal/bootstrap/starter/starter.go
+++ b/internal/bootstrap/starter/starter.go
@@ -23,7 +23,8 @@ const (
)
var (
- errEmptySchema = errors.New("empty schema can't be used")
+ // ErrEmptySchema signals that the address has no schema in it.
+ ErrEmptySchema = errors.New("empty schema can't be used")
errEmptyAddress = errors.New("empty address can't be used")
)
@@ -36,7 +37,7 @@ func ParseEndpoint(endpoint string) (Config, error) {
parts := strings.Split(endpoint, separator)
if len(parts) != 2 {
- return Config{}, fmt.Errorf("unsupported format: %q", endpoint)
+ return Config{}, fmt.Errorf("unsupported format: %q: %w", endpoint, ErrEmptySchema)
}
if err := verifySchema(parts[0]); err != nil {
@@ -65,7 +66,7 @@ func ComposeEndpoint(schema, address string) (string, error) {
func verifySchema(schema string) error {
switch schema {
case "":
- return errEmptySchema
+ return ErrEmptySchema
case TCP, TLS, Unix:
return nil
default:
diff --git a/internal/bootstrap/starter/starter_test.go b/internal/bootstrap/starter/starter_test.go
index 7347d0e13..c5d84b1f8 100644
--- a/internal/bootstrap/starter/starter_test.go
+++ b/internal/bootstrap/starter/starter_test.go
@@ -2,6 +2,7 @@ package starter
import (
"errors"
+ "fmt"
"testing"
"github.com/stretchr/testify/require"
@@ -110,12 +111,12 @@ func TestParseEndpoint(t *testing.T) {
{
desc: "no schema",
addr: "://127.0.0.1:2306",
- expErr: errEmptySchema,
+ expErr: ErrEmptySchema,
},
{
desc: "bad format",
addr: "127.0.0.1:2306",
- expErr: errors.New(`unsupported format: "127.0.0.1:2306"`),
+ expErr: fmt.Errorf(`unsupported format: "127.0.0.1:2306": %w`, ErrEmptySchema),
},
{
desc: "tcp schema addresses",
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index fe5922701..b112a716f 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -50,7 +50,7 @@ func TestAuthFailures(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
srv, serverSocketPath, cleanup := runServer(t, "quxbaz", true)
- defer srv.Shutdown(ctx)
+ defer srv.Stop()
defer cleanup()
connOpts := append(tc.opts, grpc.WithInsecure())
@@ -103,7 +103,7 @@ func TestAuthSuccess(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
srv, serverSocketPath, cleanup := runServer(t, tc.token, tc.required)
- defer srv.Shutdown(ctx)
+ defer srv.Stop()
defer cleanup()
connOpts := append(tc.opts, grpc.WithInsecure())
@@ -133,7 +133,7 @@ func dial(serverSocketPath string, opts []grpc.DialOption) (*grpc.ClientConn, er
return grpc.Dial(serverSocketPath, opts...)
}
-func runServer(t *testing.T, token string, required bool) (*Server, string, func()) {
+func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, func()) {
backendToken := "abcxyz"
mockServer := &mockSvc{
serverAccessor: func(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) {
@@ -180,13 +180,13 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry)
- srv := NewServer(coordinator.StreamDirector, logEntry, registry, conf)
+ srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
listener, err := net.Listen("unix", serverSocketPath)
require.NoError(t, err)
- go srv.Serve(listener, false)
+ go srv.Serve(listener)
return srv, "unix://" + serverSocketPath, cleanup
}
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 1944727f0..a06ce8e03 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -6,7 +6,6 @@ import (
"testing"
"time"
- "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/config/log"
@@ -201,11 +200,12 @@ func TestConfigValidation(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
err := tc.config.Validate()
if tc.errMsg == "" {
- assert.NoError(t, err)
+ require.NoError(t, err)
return
}
- assert.Contains(t, err.Error(), tc.errMsg)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), tc.errMsg)
})
}
}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 917c32de4..d4a41499b 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -89,7 +89,7 @@ func assertPrimariesExist(t testing.TB, conf config.Config) {
// config.Nodes. There must be a 1-to-1 mapping between backend server and
// configured storage node.
// requires there to be only 1 virtual storage
-func runPraefectServerWithMock(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
+func runPraefectServerWithMock(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) {
r, err := protoregistry.New(mustLoadProtoReg(t))
require.NoError(t, err)
@@ -182,13 +182,13 @@ func withRealGitalyShared(t testing.TB) func([]*config.VirtualStorage) []testhel
}
}
-func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
+func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) {
return runPraefectServerWithGitalyWithDatastore(t, conf, defaultQueue(conf))
}
// runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes
// requires exactly 1 virtual storage
-func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
+func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) {
return runPraefectServer(t, conf, buildOptions{
withQueue: queue,
withTxMgr: transactions.NewManager(),
@@ -211,7 +211,7 @@ func defaultNodeMgr(t testing.TB, conf config.Config, queue datastore.Replicatio
return nodeMgr
}
-func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
+func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) {
assertPrimariesExist(t, conf)
var cleanups []testhelper.Cleanup
@@ -250,7 +250,8 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
opt.withQueue,
opt.withNodeMgr,
)
- prf := NewServer(coordinator.StreamDirector, opt.withLogger, opt.withAnnotations, conf)
+
+ prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue)
listener, port := listenAvailPort(t)
t.Logf("proxy listening on port %d", port)
@@ -258,8 +259,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
errQ := make(chan error)
ctx, cancel := testhelper.Context()
- prf.RegisterServices(opt.withNodeMgr, opt.withTxMgr, conf, opt.withQueue)
- go func() { errQ <- prf.Serve(listener, false) }()
+ go func() { errQ <- prf.Serve(listener) }()
replmgr.ProcessBacklog(ctx, noopBackoffFunc)
// dial client to praefect
@@ -270,9 +270,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
cu()
}
- ctx, timed := context.WithTimeout(ctx, time.Second)
- defer timed()
- require.NoError(t, prf.Shutdown(ctx))
+ prf.Stop()
cancel()
require.Error(t, context.Canceled, <-errQ)
@@ -290,7 +288,7 @@ type partialGitaly interface {
healthpb.HealthServer
}
-func registerServices(server *grpc.Server, pg partialGitaly) {
+func registerGitalyServices(server *grpc.Server, pg partialGitaly) {
gitalypb.RegisterServerServiceServer(server, pg)
gitalypb.RegisterRepositoryServiceServer(server, pg)
gitalypb.RegisterInternalGitalyServer(server, pg)
@@ -325,7 +323,7 @@ func runInternalGitalyServer(t testing.TB, storages []gconfig.Storage, token str
internalListener, err := net.Listen("unix", internalSocketPath)
require.NoError(t, err)
- registerServices(server, realGitaly(storages, token, internalSocketPath))
+ registerGitalyServices(server, realGitaly(storages, token, internalSocketPath))
errQ := make(chan error)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 55e2942cb..425299ca4 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -225,18 +225,13 @@ func TestPropagateReplicationJob(t *testing.T) {
replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, nodeMgr)
- prf := NewServer(
- coordinator.StreamDirector,
- logEntry,
- protoregistry.GitalyProtoPreregistered,
- conf,
- )
+ prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue)
+
listener, port := listenAvailPort(t)
ctx, cancel := testhelper.Context()
defer cancel()
- prf.RegisterServices(nodeMgr, txMgr, conf, queue)
- go prf.Serve(listener, false)
+ go prf.Serve(listener)
defer prf.Stop()
cc := dialLocalPort(t, port, false)
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 229d30200..08f5aac4f 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -3,9 +3,6 @@ calls to a set of Gitaly services.*/
package praefect
import (
- "context"
- "net"
-
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
@@ -35,35 +32,18 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
-// Server is a praefect server
-type Server struct {
- s *grpc.Server
- l *logrus.Entry
-}
-
-func (srv *Server) warnDupeAddrs(c config.Config) {
- var fishy bool
-
- for _, virtualStorage := range c.VirtualStorages {
- addrSet := map[string]struct{}{}
- for _, n := range virtualStorage.Nodes {
- _, ok := addrSet[n.Address]
- if ok {
- srv.l.Warnf("more than one backend node is hosted at %s", n.Address)
- fishy = true
- continue
- }
- addrSet[n.Address] = struct{}{}
- }
- if fishy {
- srv.l.Warnf("your Praefect configuration may not offer actual redundancy")
- }
- }
-}
-
-// NewServer returns an initialized praefect gPRC proxy server configured
-// with the provided gRPC server options
-func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.Registry, conf config.Config, grpcOpts ...grpc.ServerOption) *Server {
+// NewGRPCServer returns gRPC server with registered proxy-handler and actual services praefect serves on its own.
+// It includes a set of unary and stream interceptors required to add logging, authentication, etc.
+func NewGRPCServer(
+ conf config.Config,
+ logger *logrus.Entry,
+ registry *protoregistry.Registry,
+ director proxy.StreamDirector,
+ nodeMgr nodes.Manager,
+ txMgr *transactions.Manager,
+ queue datastore.ReplicationEventQueue,
+ grpcOpts ...grpc.ServerOption,
+) *grpc.Server {
ctxTagOpts := []grpc_ctxtags.Option{
grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
}
@@ -73,10 +53,10 @@ func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
- middleware.MethodTypeStreamInterceptor(r),
+ middleware.MethodTypeStreamInterceptor(registry),
metadatahandler.StreamInterceptor,
grpc_prometheus.StreamServerInterceptor,
- grpc_logrus.StreamServerInterceptor(l),
+ grpc_logrus.StreamServerInterceptor(logger),
sentryhandler.StreamLogHandler,
cancelhandler.Stream, // Should be below LogHandler
grpctracing.StreamServerTracingInterceptor(),
@@ -88,10 +68,10 @@ func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...),
grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
- middleware.MethodTypeUnaryInterceptor(r),
+ middleware.MethodTypeUnaryInterceptor(registry),
metadatahandler.UnaryInterceptor,
grpc_prometheus.UnaryServerInterceptor,
- grpc_logrus.UnaryServerInterceptor(l),
+ grpc_logrus.UnaryServerInterceptor(logger),
sentryhandler.UnaryLogHandler,
cancelhandler.Unary, // Should be below LogHandler
grpctracing.UnaryServerTracingInterceptor(),
@@ -102,14 +82,11 @@ func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.
)),
}...)
- s := &Server{
- s: grpc.NewServer(grpcOpts...),
- l: l,
- }
+ warnDupeAddrs(logger, conf)
- s.warnDupeAddrs(conf)
-
- return s
+ srv := grpc.NewServer(grpcOpts...)
+ registerServices(srv, nodeMgr, txMgr, conf, queue)
+ return srv
}
func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
@@ -119,47 +96,33 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
}
}
-// Serve starts serving requests from the listener
-func (srv *Server) Serve(l net.Listener, secure bool) error {
- return srv.s.Serve(l)
-}
-
-// RegisterServices will register any services praefect needs to handle rpcs on its own
-func (srv *Server) RegisterServices(nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue) {
+// registerServices registers services praefect needs to handle RPCs on its own.
+func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue) {
// ServerServiceServer is necessary for the ServerInfo RPC
- gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(conf, nm))
- gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm, conf, queue))
- gitalypb.RegisterRefTransactionServer(srv.s, transaction.NewServer(tm))
- healthpb.RegisterHealthServer(srv.s, health.NewServer())
+ gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, nm))
+ gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue))
+ gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm))
+ healthpb.RegisterHealthServer(srv, health.NewServer())
- grpc_prometheus.Register(srv.s)
+ grpc_prometheus.Register(srv)
}
-// Shutdown will attempt a graceful shutdown of the grpc server. If unable
-// to gracefully shutdown within the context deadline, it will then
-// forcefully shutdown the server and return a context cancellation error.
-func (srv *Server) Shutdown(ctx context.Context) error {
- done := make(chan struct{})
- go func() {
- srv.s.GracefulStop()
- close(done)
- }()
+func warnDupeAddrs(logger logrus.FieldLogger, conf config.Config) {
+ var fishy bool
- select {
- case <-ctx.Done():
- srv.s.Stop()
- return ctx.Err()
- case <-done:
- return nil
+ for _, virtualStorage := range conf.VirtualStorages {
+ addrSet := map[string]struct{}{}
+ for _, n := range virtualStorage.Nodes {
+ _, ok := addrSet[n.Address]
+ if ok {
+ logger.Warnf("more than one backend node is hosted at %s", n.Address)
+ fishy = true
+ continue
+ }
+ addrSet[n.Address] = struct{}{}
+ }
+ if fishy {
+ logger.Warnf("your Praefect configuration may not offer actual redundancy")
+ }
}
}
-
-// GracefulStop stops the praefect server gracefully
-func (srv *Server) GracefulStop() {
- srv.s.GracefulStop()
-}
-
-// Stop stops the praefect server
-func (srv *Server) Stop() {
- srv.s.Stop()
-}
diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go
new file mode 100644
index 000000000..90309380e
--- /dev/null
+++ b/internal/praefect/server_factory.go
@@ -0,0 +1,117 @@
+package praefect
+
+import (
+ "net"
+ "sync"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
+ "google.golang.org/grpc"
+)
+
+// NewServerFactory returns factory object for initialization of praefect gRPC servers.
+func NewServerFactory(
+ conf config.Config,
+ logger *logrus.Entry,
+ director proxy.StreamDirector,
+ nodeMgr nodes.Manager,
+ txMgr *transactions.Manager,
+ queue datastore.ReplicationEventQueue,
+ registry *protoregistry.Registry,
+) *ServerFactory {
+ return &ServerFactory{
+ conf: conf,
+ logger: logger,
+ director: director,
+ nodeMgr: nodeMgr,
+ txMgr: txMgr,
+ queue: queue,
+ registry: registry,
+ }
+}
+
+// ServerFactory is a factory of praefect grpc servers
+type ServerFactory struct {
+ mtx sync.Mutex
+ conf config.Config
+ logger *logrus.Entry
+ director proxy.StreamDirector
+ nodeMgr nodes.Manager
+ txMgr *transactions.Manager
+ queue datastore.ReplicationEventQueue
+ registry *protoregistry.Registry
+ insecure []*grpc.Server
+}
+
+// Serve starts serving on the provided listener with newly created grpc.Server
+func (s *ServerFactory) Serve(l net.Listener, secure bool) error {
+ srv, err := s.create()
+ if err != nil {
+ return err
+ }
+
+ return srv.Serve(l)
+}
+
+// Stop stops all servers created by the factory.
+func (s *ServerFactory) Stop() {
+ for _, srv := range s.all() {
+ srv.Stop()
+ }
+}
+
+// GracefulStop stops both the secure and insecure servers gracefully.
+func (s *ServerFactory) GracefulStop() {
+ wg := sync.WaitGroup{}
+
+ for _, srv := range s.all() {
+ wg.Add(1)
+
+ go func(s *grpc.Server) {
+ s.GracefulStop()
+ wg.Done()
+ }(srv)
+ }
+
+ wg.Wait()
+}
+
+func (s *ServerFactory) create() (*grpc.Server, error) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ s.insecure = append(s.insecure, s.createGRPC())
+
+ return s.insecure[len(s.insecure)-1], nil
+}
+
+func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server {
+ return NewGRPCServer(
+ s.conf,
+ s.logger,
+ s.registry,
+ s.director,
+ s.nodeMgr,
+ s.txMgr,
+ s.queue,
+ grpcOpts...,
+ )
+}
+
+func (s *ServerFactory) all() []*grpc.Server {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ var servers []*grpc.Server
+
+ if s.insecure != nil {
+ servers = append(servers, s.insecure...)
+ }
+
+ return servers
+}
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
new file mode 100644
index 000000000..0982b1eec
--- /dev/null
+++ b/internal/praefect/server_factory_test.go
@@ -0,0 +1,174 @@
+package praefect
+
+import (
+ "context"
+ "net"
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter"
+ gconfig "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/text"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
+ "gitlab.com/gitlab-org/gitaly/internal/server"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+func TestServerFactory(t *testing.T) {
+ gitalyServerFactory := server.NewGitalyServerFactory(nil)
+ defer gitalyServerFactory.Stop()
+
+ // start gitaly serving on public endpoint
+ gitalyListener, err := net.Listen(starter.TCP, ":0")
+ require.NoError(t, err)
+ defer func() { require.NoError(t, gitalyListener.Close()) }()
+ go gitalyServerFactory.Serve(gitalyListener, false)
+
+ // start gitaly serving on internal endpoint
+ gitalyInternalSocketPath := gconfig.GitalyInternalSocketPath()
+ defer func() { require.NoError(t, os.RemoveAll(gitalyInternalSocketPath)) }()
+ gitalyInternalListener, err := net.Listen(starter.Unix, gitalyInternalSocketPath)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, gitalyInternalListener.Close()) }()
+ go gitalyServerFactory.Serve(gitalyInternalListener, false)
+
+ gitalyAddr, err := starter.ComposeEndpoint(gitalyListener.Addr().Network(), gitalyListener.Addr().String())
+ require.NoError(t, err)
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{
+ {
+ DefaultPrimary: true,
+ Storage: gconfig.Config.Storages[0].Name,
+ Address: gitalyAddr,
+ Token: gconfig.Config.Auth.Token,
+ },
+ },
+ },
+ },
+ }
+
+ repo, repoPath, cleanup := testhelper.NewTestRepo(t)
+ defer cleanup()
+ repo.StorageName = conf.VirtualStorages[0].Name // storage must be re-written to virtual to be properly redirected by praefect
+ revision := text.ChompBytes(testhelper.MustRunCommand(t, nil, "git", "-C", repoPath, "rev-parse", "HEAD"))
+
+ logger := testhelper.DiscardTestEntry(t)
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
+ nodeMgr, err := nodes.NewManager(logger, conf, nil, queue, &promtest.MockHistogramVec{})
+ require.NoError(t, err)
+ txMgr := transactions.NewManager()
+ registry := protoregistry.GitalyProtoPreregistered
+ coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry)
+
+ checkOwnRegisteredServices := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) healthpb.HealthClient {
+ t.Helper()
+
+ healthClient := healthpb.NewHealthClient(cc)
+ resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{})
+ require.NoError(t, err)
+ require.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status)
+ return healthClient
+ }
+
+ checkProxyingOntoGitaly := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
+ t.Helper()
+
+ commitClient := gitalypb.NewCommitServiceClient(cc)
+ resp, err := commitClient.CommitLanguages(ctx, &gitalypb.CommitLanguagesRequest{Repository: repo, Revision: []byte(revision)})
+ require.NoError(t, err)
+ require.Len(t, resp.Languages, 4)
+ }
+
+ t.Run("insecure", func(t *testing.T) {
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ defer praefectServerFactory.Stop()
+
+ listener, err := net.Listen(starter.TCP, ":0")
+ require.NoError(t, err)
+ defer func() { require.NoError(t, listener.Close()) }()
+
+ go praefectServerFactory.Serve(listener, false)
+
+ praefectAddr, err := starter.ComposeEndpoint(listener.Addr().Network(), listener.Addr().String())
+ require.NoError(t, err)
+
+ cc, err := client.Dial(praefectAddr, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, cc.Close()) }()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ t.Run("handles registered RPCs", func(t *testing.T) {
+ checkOwnRegisteredServices(ctx, t, cc)
+ })
+
+ t.Run("proxies RPCs onto gitaly server", func(t *testing.T) {
+ checkProxyingOntoGitaly(ctx, t, cc)
+ })
+ })
+
+ t.Run("stops all listening servers", func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ // start with tcp listener
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ defer praefectServerFactory.Stop()
+
+ tcpListener, err := net.Listen(starter.TCP, ":0")
+ require.NoError(t, err)
+ defer tcpListener.Close()
+
+ go praefectServerFactory.Serve(tcpListener, false)
+
+ praefectTCPAddr, err := starter.ComposeEndpoint(tcpListener.Addr().Network(), tcpListener.Addr().String())
+ require.NoError(t, err)
+
+ tcpCC, err := client.Dial(praefectTCPAddr, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, tcpCC.Close()) }()
+
+ tcpHealthClient := checkOwnRegisteredServices(ctx, t, tcpCC)
+
+ // start with socket listener
+ socketPath := testhelper.GetTemporaryGitalySocketFileName()
+ defer func() { require.NoError(t, os.RemoveAll(socketPath)) }()
+ socketListener, err := net.Listen(starter.Unix, socketPath)
+ require.NoError(t, err)
+ defer socketListener.Close()
+
+ go praefectServerFactory.Serve(socketListener, false)
+
+ praefectSocketAddr, err := starter.ComposeEndpoint(socketListener.Addr().Network(), socketListener.Addr().String())
+ require.NoError(t, err)
+
+ socketCC, err := client.Dial(praefectSocketAddr, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, socketCC.Close()) }()
+
+ unixHealthClient := checkOwnRegisteredServices(ctx, t, socketCC)
+
+ praefectServerFactory.GracefulStop()
+
+ _, err = tcpHealthClient.Check(ctx, nil)
+ require.Error(t, err)
+
+ _, err = unixHealthClient.Check(ctx, nil)
+ require.Error(t, err)
+ })
+}