From 389ae3336772c9bbf7cd21f2f6aed262e8ae0357 Mon Sep 17 00:00:00 2001 From: Pavlo Strokov Date: Fri, 26 Jun 2020 11:16:27 +0000 Subject: Praefect: server factory introduction Introduces server factory for creating gRPC servers. Praefect gRPC server created by separate function and can be reused in tests to check routing. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/1698 --- internal/bootstrap/starter/starter.go | 7 +- internal/bootstrap/starter/starter_test.go | 5 +- internal/praefect/auth_test.go | 10 +- internal/praefect/config/config_test.go | 6 +- internal/praefect/helper_test.go | 22 ++-- internal/praefect/replicator_test.go | 11 +- internal/praefect/server.go | 123 +++++++------------- internal/praefect/server_factory.go | 117 +++++++++++++++++++ internal/praefect/server_factory_test.go | 174 +++++++++++++++++++++++++++++ 9 files changed, 362 insertions(+), 113 deletions(-) create mode 100644 internal/praefect/server_factory.go create mode 100644 internal/praefect/server_factory_test.go (limited to 'internal') diff --git a/internal/bootstrap/starter/starter.go b/internal/bootstrap/starter/starter.go index 7c43781af..a8b7c018c 100644 --- a/internal/bootstrap/starter/starter.go +++ b/internal/bootstrap/starter/starter.go @@ -23,7 +23,8 @@ const ( ) var ( - errEmptySchema = errors.New("empty schema can't be used") + // ErrEmptySchema signals that the address has no schema in it. + ErrEmptySchema = errors.New("empty schema can't be used") errEmptyAddress = errors.New("empty address can't be used") ) @@ -36,7 +37,7 @@ func ParseEndpoint(endpoint string) (Config, error) { parts := strings.Split(endpoint, separator) if len(parts) != 2 { - return Config{}, fmt.Errorf("unsupported format: %q", endpoint) + return Config{}, fmt.Errorf("unsupported format: %q: %w", endpoint, ErrEmptySchema) } if err := verifySchema(parts[0]); err != nil { @@ -65,7 +66,7 @@ func ComposeEndpoint(schema, address string) (string, error) { func verifySchema(schema string) error { switch schema { case "": - return errEmptySchema + return ErrEmptySchema case TCP, TLS, Unix: return nil default: diff --git a/internal/bootstrap/starter/starter_test.go b/internal/bootstrap/starter/starter_test.go index 7347d0e13..c5d84b1f8 100644 --- a/internal/bootstrap/starter/starter_test.go +++ b/internal/bootstrap/starter/starter_test.go @@ -2,6 +2,7 @@ package starter import ( "errors" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -110,12 +111,12 @@ func TestParseEndpoint(t *testing.T) { { desc: "no schema", addr: "://127.0.0.1:2306", - expErr: errEmptySchema, + expErr: ErrEmptySchema, }, { desc: "bad format", addr: "127.0.0.1:2306", - expErr: errors.New(`unsupported format: "127.0.0.1:2306"`), + expErr: fmt.Errorf(`unsupported format: "127.0.0.1:2306": %w`, ErrEmptySchema), }, { desc: "tcp schema addresses", diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index fe5922701..b112a716f 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -50,7 +50,7 @@ func TestAuthFailures(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { srv, serverSocketPath, cleanup := runServer(t, "quxbaz", true) - defer srv.Shutdown(ctx) + defer srv.Stop() defer cleanup() connOpts := append(tc.opts, grpc.WithInsecure()) @@ -103,7 +103,7 @@ func TestAuthSuccess(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { srv, serverSocketPath, cleanup := runServer(t, tc.token, tc.required) - defer srv.Shutdown(ctx) + defer srv.Stop() defer cleanup() connOpts := append(tc.opts, grpc.WithInsecure()) @@ -133,7 +133,7 @@ func dial(serverSocketPath string, opts []grpc.DialOption) (*grpc.ClientConn, er return grpc.Dial(serverSocketPath, opts...) } -func runServer(t *testing.T, token string, required bool) (*Server, string, func()) { +func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, func()) { backendToken := "abcxyz" mockServer := &mockSvc{ serverAccessor: func(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) { @@ -180,13 +180,13 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry) - srv := NewServer(coordinator.StreamDirector, logEntry, registry, conf) + srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() listener, err := net.Listen("unix", serverSocketPath) require.NoError(t, err) - go srv.Serve(listener, false) + go srv.Serve(listener) return srv, "unix://" + serverSocketPath, cleanup } diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 1944727f0..a06ce8e03 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/config/log" @@ -201,11 +200,12 @@ func TestConfigValidation(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { err := tc.config.Validate() if tc.errMsg == "" { - assert.NoError(t, err) + require.NoError(t, err) return } - assert.Contains(t, err.Error(), tc.errMsg) + require.Error(t, err) + require.Contains(t, err.Error(), tc.errMsg) }) } } diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 917c32de4..d4a41499b 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -89,7 +89,7 @@ func assertPrimariesExist(t testing.TB, conf config.Config) { // config.Nodes. There must be a 1-to-1 mapping between backend server and // configured storage node. // requires there to be only 1 virtual storage -func runPraefectServerWithMock(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) { +func runPraefectServerWithMock(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) { r, err := protoregistry.New(mustLoadProtoReg(t)) require.NoError(t, err) @@ -182,13 +182,13 @@ func withRealGitalyShared(t testing.TB) func([]*config.VirtualStorage) []testhel } } -func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) { +func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) { return runPraefectServerWithGitalyWithDatastore(t, conf, defaultQueue(conf)) } // runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes // requires exactly 1 virtual storage -func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue) (*grpc.ClientConn, *Server, testhelper.Cleanup) { +func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) { return runPraefectServer(t, conf, buildOptions{ withQueue: queue, withTxMgr: transactions.NewManager(), @@ -211,7 +211,7 @@ func defaultNodeMgr(t testing.TB, conf config.Config, queue datastore.Replicatio return nodeMgr } -func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grpc.ClientConn, *Server, testhelper.Cleanup) { +func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) { assertPrimariesExist(t, conf) var cleanups []testhelper.Cleanup @@ -250,7 +250,8 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp opt.withQueue, opt.withNodeMgr, ) - prf := NewServer(coordinator.StreamDirector, opt.withLogger, opt.withAnnotations, conf) + + prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue) listener, port := listenAvailPort(t) t.Logf("proxy listening on port %d", port) @@ -258,8 +259,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp errQ := make(chan error) ctx, cancel := testhelper.Context() - prf.RegisterServices(opt.withNodeMgr, opt.withTxMgr, conf, opt.withQueue) - go func() { errQ <- prf.Serve(listener, false) }() + go func() { errQ <- prf.Serve(listener) }() replmgr.ProcessBacklog(ctx, noopBackoffFunc) // dial client to praefect @@ -270,9 +270,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp cu() } - ctx, timed := context.WithTimeout(ctx, time.Second) - defer timed() - require.NoError(t, prf.Shutdown(ctx)) + prf.Stop() cancel() require.Error(t, context.Canceled, <-errQ) @@ -290,7 +288,7 @@ type partialGitaly interface { healthpb.HealthServer } -func registerServices(server *grpc.Server, pg partialGitaly) { +func registerGitalyServices(server *grpc.Server, pg partialGitaly) { gitalypb.RegisterServerServiceServer(server, pg) gitalypb.RegisterRepositoryServiceServer(server, pg) gitalypb.RegisterInternalGitalyServer(server, pg) @@ -325,7 +323,7 @@ func runInternalGitalyServer(t testing.TB, storages []gconfig.Storage, token str internalListener, err := net.Listen("unix", internalSocketPath) require.NoError(t, err) - registerServices(server, realGitaly(storages, token, internalSocketPath)) + registerGitalyServices(server, realGitaly(storages, token, internalSocketPath)) errQ := make(chan error) diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 55e2942cb..425299ca4 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -225,18 +225,13 @@ func TestPropagateReplicationJob(t *testing.T) { replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, nodeMgr) - prf := NewServer( - coordinator.StreamDirector, - logEntry, - protoregistry.GitalyProtoPreregistered, - conf, - ) + prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue) + listener, port := listenAvailPort(t) ctx, cancel := testhelper.Context() defer cancel() - prf.RegisterServices(nodeMgr, txMgr, conf, queue) - go prf.Serve(listener, false) + go prf.Serve(listener) defer prf.Stop() cc := dialLocalPort(t, port, false) diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 229d30200..08f5aac4f 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -3,9 +3,6 @@ calls to a set of Gitaly services.*/ package praefect import ( - "context" - "net" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -35,35 +32,18 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -// Server is a praefect server -type Server struct { - s *grpc.Server - l *logrus.Entry -} - -func (srv *Server) warnDupeAddrs(c config.Config) { - var fishy bool - - for _, virtualStorage := range c.VirtualStorages { - addrSet := map[string]struct{}{} - for _, n := range virtualStorage.Nodes { - _, ok := addrSet[n.Address] - if ok { - srv.l.Warnf("more than one backend node is hosted at %s", n.Address) - fishy = true - continue - } - addrSet[n.Address] = struct{}{} - } - if fishy { - srv.l.Warnf("your Praefect configuration may not offer actual redundancy") - } - } -} - -// NewServer returns an initialized praefect gPRC proxy server configured -// with the provided gRPC server options -func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry.Registry, conf config.Config, grpcOpts ...grpc.ServerOption) *Server { +// NewGRPCServer returns gRPC server with registered proxy-handler and actual services praefect serves on its own. +// It includes a set of unary and stream interceptors required to add logging, authentication, etc. +func NewGRPCServer( + conf config.Config, + logger *logrus.Entry, + registry *protoregistry.Registry, + director proxy.StreamDirector, + nodeMgr nodes.Manager, + txMgr *transactions.Manager, + queue datastore.ReplicationEventQueue, + grpcOpts ...grpc.ServerOption, +) *grpc.Server { ctxTagOpts := []grpc_ctxtags.Option{ grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), } @@ -73,10 +53,10 @@ func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry. grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler - middleware.MethodTypeStreamInterceptor(r), + middleware.MethodTypeStreamInterceptor(registry), metadatahandler.StreamInterceptor, grpc_prometheus.StreamServerInterceptor, - grpc_logrus.StreamServerInterceptor(l), + grpc_logrus.StreamServerInterceptor(logger), sentryhandler.StreamLogHandler, cancelhandler.Stream, // Should be below LogHandler grpctracing.StreamServerTracingInterceptor(), @@ -88,10 +68,10 @@ func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry. grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler - middleware.MethodTypeUnaryInterceptor(r), + middleware.MethodTypeUnaryInterceptor(registry), metadatahandler.UnaryInterceptor, grpc_prometheus.UnaryServerInterceptor, - grpc_logrus.UnaryServerInterceptor(l), + grpc_logrus.UnaryServerInterceptor(logger), sentryhandler.UnaryLogHandler, cancelhandler.Unary, // Should be below LogHandler grpctracing.UnaryServerTracingInterceptor(), @@ -102,14 +82,11 @@ func NewServer(director proxy.StreamDirector, l *logrus.Entry, r *protoregistry. )), }...) - s := &Server{ - s: grpc.NewServer(grpcOpts...), - l: l, - } + warnDupeAddrs(logger, conf) - s.warnDupeAddrs(conf) - - return s + srv := grpc.NewServer(grpcOpts...) + registerServices(srv, nodeMgr, txMgr, conf, queue) + return srv } func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { @@ -119,47 +96,33 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { } } -// Serve starts serving requests from the listener -func (srv *Server) Serve(l net.Listener, secure bool) error { - return srv.s.Serve(l) -} - -// RegisterServices will register any services praefect needs to handle rpcs on its own -func (srv *Server) RegisterServices(nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue) { +// registerServices registers services praefect needs to handle RPCs on its own. +func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue) { // ServerServiceServer is necessary for the ServerInfo RPC - gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(conf, nm)) - gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm, conf, queue)) - gitalypb.RegisterRefTransactionServer(srv.s, transaction.NewServer(tm)) - healthpb.RegisterHealthServer(srv.s, health.NewServer()) + gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, nm)) + gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue)) + gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm)) + healthpb.RegisterHealthServer(srv, health.NewServer()) - grpc_prometheus.Register(srv.s) + grpc_prometheus.Register(srv) } -// Shutdown will attempt a graceful shutdown of the grpc server. If unable -// to gracefully shutdown within the context deadline, it will then -// forcefully shutdown the server and return a context cancellation error. -func (srv *Server) Shutdown(ctx context.Context) error { - done := make(chan struct{}) - go func() { - srv.s.GracefulStop() - close(done) - }() +func warnDupeAddrs(logger logrus.FieldLogger, conf config.Config) { + var fishy bool - select { - case <-ctx.Done(): - srv.s.Stop() - return ctx.Err() - case <-done: - return nil + for _, virtualStorage := range conf.VirtualStorages { + addrSet := map[string]struct{}{} + for _, n := range virtualStorage.Nodes { + _, ok := addrSet[n.Address] + if ok { + logger.Warnf("more than one backend node is hosted at %s", n.Address) + fishy = true + continue + } + addrSet[n.Address] = struct{}{} + } + if fishy { + logger.Warnf("your Praefect configuration may not offer actual redundancy") + } } } - -// GracefulStop stops the praefect server gracefully -func (srv *Server) GracefulStop() { - srv.s.GracefulStop() -} - -// Stop stops the praefect server -func (srv *Server) Stop() { - srv.s.Stop() -} diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go new file mode 100644 index 000000000..90309380e --- /dev/null +++ b/internal/praefect/server_factory.go @@ -0,0 +1,117 @@ +package praefect + +import ( + "net" + "sync" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" + "google.golang.org/grpc" +) + +// NewServerFactory returns factory object for initialization of praefect gRPC servers. +func NewServerFactory( + conf config.Config, + logger *logrus.Entry, + director proxy.StreamDirector, + nodeMgr nodes.Manager, + txMgr *transactions.Manager, + queue datastore.ReplicationEventQueue, + registry *protoregistry.Registry, +) *ServerFactory { + return &ServerFactory{ + conf: conf, + logger: logger, + director: director, + nodeMgr: nodeMgr, + txMgr: txMgr, + queue: queue, + registry: registry, + } +} + +// ServerFactory is a factory of praefect grpc servers +type ServerFactory struct { + mtx sync.Mutex + conf config.Config + logger *logrus.Entry + director proxy.StreamDirector + nodeMgr nodes.Manager + txMgr *transactions.Manager + queue datastore.ReplicationEventQueue + registry *protoregistry.Registry + insecure []*grpc.Server +} + +// Serve starts serving on the provided listener with newly created grpc.Server +func (s *ServerFactory) Serve(l net.Listener, secure bool) error { + srv, err := s.create() + if err != nil { + return err + } + + return srv.Serve(l) +} + +// Stop stops all servers created by the factory. +func (s *ServerFactory) Stop() { + for _, srv := range s.all() { + srv.Stop() + } +} + +// GracefulStop stops both the secure and insecure servers gracefully. +func (s *ServerFactory) GracefulStop() { + wg := sync.WaitGroup{} + + for _, srv := range s.all() { + wg.Add(1) + + go func(s *grpc.Server) { + s.GracefulStop() + wg.Done() + }(srv) + } + + wg.Wait() +} + +func (s *ServerFactory) create() (*grpc.Server, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.insecure = append(s.insecure, s.createGRPC()) + + return s.insecure[len(s.insecure)-1], nil +} + +func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server { + return NewGRPCServer( + s.conf, + s.logger, + s.registry, + s.director, + s.nodeMgr, + s.txMgr, + s.queue, + grpcOpts..., + ) +} + +func (s *ServerFactory) all() []*grpc.Server { + s.mtx.Lock() + defer s.mtx.Unlock() + + var servers []*grpc.Server + + if s.insecure != nil { + servers = append(servers, s.insecure...) + } + + return servers +} diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go new file mode 100644 index 000000000..0982b1eec --- /dev/null +++ b/internal/praefect/server_factory_test.go @@ -0,0 +1,174 @@ +package praefect + +import ( + "context" + "net" + "os" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter" + gconfig "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/helper/text" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" + "gitlab.com/gitlab-org/gitaly/internal/server" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +func TestServerFactory(t *testing.T) { + gitalyServerFactory := server.NewGitalyServerFactory(nil) + defer gitalyServerFactory.Stop() + + // start gitaly serving on public endpoint + gitalyListener, err := net.Listen(starter.TCP, ":0") + require.NoError(t, err) + defer func() { require.NoError(t, gitalyListener.Close()) }() + go gitalyServerFactory.Serve(gitalyListener, false) + + // start gitaly serving on internal endpoint + gitalyInternalSocketPath := gconfig.GitalyInternalSocketPath() + defer func() { require.NoError(t, os.RemoveAll(gitalyInternalSocketPath)) }() + gitalyInternalListener, err := net.Listen(starter.Unix, gitalyInternalSocketPath) + require.NoError(t, err) + defer func() { require.NoError(t, gitalyInternalListener.Close()) }() + go gitalyServerFactory.Serve(gitalyInternalListener, false) + + gitalyAddr, err := starter.ComposeEndpoint(gitalyListener.Addr().Network(), gitalyListener.Addr().String()) + require.NoError(t, err) + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "praefect", + Nodes: []*config.Node{ + { + DefaultPrimary: true, + Storage: gconfig.Config.Storages[0].Name, + Address: gitalyAddr, + Token: gconfig.Config.Auth.Token, + }, + }, + }, + }, + } + + repo, repoPath, cleanup := testhelper.NewTestRepo(t) + defer cleanup() + repo.StorageName = conf.VirtualStorages[0].Name // storage must be re-written to virtual to be properly redirected by praefect + revision := text.ChompBytes(testhelper.MustRunCommand(t, nil, "git", "-C", repoPath, "rev-parse", "HEAD")) + + logger := testhelper.DiscardTestEntry(t) + queue := datastore.NewMemoryReplicationEventQueue(conf) + nodeMgr, err := nodes.NewManager(logger, conf, nil, queue, &promtest.MockHistogramVec{}) + require.NoError(t, err) + txMgr := transactions.NewManager() + registry := protoregistry.GitalyProtoPreregistered + coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry) + + checkOwnRegisteredServices := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) healthpb.HealthClient { + t.Helper() + + healthClient := healthpb.NewHealthClient(cc) + resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{}) + require.NoError(t, err) + require.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status) + return healthClient + } + + checkProxyingOntoGitaly := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) { + t.Helper() + + commitClient := gitalypb.NewCommitServiceClient(cc) + resp, err := commitClient.CommitLanguages(ctx, &gitalypb.CommitLanguagesRequest{Repository: repo, Revision: []byte(revision)}) + require.NoError(t, err) + require.Len(t, resp.Languages, 4) + } + + t.Run("insecure", func(t *testing.T) { + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + defer praefectServerFactory.Stop() + + listener, err := net.Listen(starter.TCP, ":0") + require.NoError(t, err) + defer func() { require.NoError(t, listener.Close()) }() + + go praefectServerFactory.Serve(listener, false) + + praefectAddr, err := starter.ComposeEndpoint(listener.Addr().Network(), listener.Addr().String()) + require.NoError(t, err) + + cc, err := client.Dial(praefectAddr, nil) + require.NoError(t, err) + defer func() { require.NoError(t, cc.Close()) }() + + ctx, cancel := testhelper.Context() + defer cancel() + + t.Run("handles registered RPCs", func(t *testing.T) { + checkOwnRegisteredServices(ctx, t, cc) + }) + + t.Run("proxies RPCs onto gitaly server", func(t *testing.T) { + checkProxyingOntoGitaly(ctx, t, cc) + }) + }) + + t.Run("stops all listening servers", func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + // start with tcp listener + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + defer praefectServerFactory.Stop() + + tcpListener, err := net.Listen(starter.TCP, ":0") + require.NoError(t, err) + defer tcpListener.Close() + + go praefectServerFactory.Serve(tcpListener, false) + + praefectTCPAddr, err := starter.ComposeEndpoint(tcpListener.Addr().Network(), tcpListener.Addr().String()) + require.NoError(t, err) + + tcpCC, err := client.Dial(praefectTCPAddr, nil) + require.NoError(t, err) + defer func() { require.NoError(t, tcpCC.Close()) }() + + tcpHealthClient := checkOwnRegisteredServices(ctx, t, tcpCC) + + // start with socket listener + socketPath := testhelper.GetTemporaryGitalySocketFileName() + defer func() { require.NoError(t, os.RemoveAll(socketPath)) }() + socketListener, err := net.Listen(starter.Unix, socketPath) + require.NoError(t, err) + defer socketListener.Close() + + go praefectServerFactory.Serve(socketListener, false) + + praefectSocketAddr, err := starter.ComposeEndpoint(socketListener.Addr().Network(), socketListener.Addr().String()) + require.NoError(t, err) + + socketCC, err := client.Dial(praefectSocketAddr, nil) + require.NoError(t, err) + defer func() { require.NoError(t, socketCC.Close()) }() + + unixHealthClient := checkOwnRegisteredServices(ctx, t, socketCC) + + praefectServerFactory.GracefulStop() + + _, err = tcpHealthClient.Check(ctx, nil) + require.Error(t, err) + + _, err = unixHealthClient.Check(ctx, nil) + require.Error(t, err) + }) +} -- cgit v1.2.3