diff options
author | John Cai <jcai@gitlab.com> | 2019-11-18 22:38:59 +0300 |
---|---|---|
committer | jramsay <jcai@gitlab.com> | 2019-11-27 03:22:12 +0300 |
commit | 7c4f16371b8f051f1eb3a7e989d26ea7eb962a8e (patch) | |
tree | e3e158da21a79a4ca2b4feeade654505d326785a | |
parent | 82d31745e0901dc91c2fe02f44ff8ff659a22c09 (diff) |
Wire in bootstrap package for praefect for zero downtime deploy
-rw-r--r-- | changelogs/unreleased/jc-zero-downtime-praefect.yml | 5 | ||||
-rw-r--r-- | cmd/gitaly/main.go | 15 | ||||
-rw-r--r-- | cmd/gitaly/starter_config.go | 45 | ||||
-rw-r--r-- | cmd/praefect/main.go | 83 | ||||
-rw-r--r-- | internal/bootstrap/server_factory.go | 28 | ||||
-rw-r--r-- | internal/bootstrap/starter/starter.go | 50 | ||||
-rw-r--r-- | internal/bootstrap/starter/starter_test.go (renamed from cmd/gitaly/starter_config_test.go) | 6 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 7 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 6 | ||||
-rw-r--r-- | internal/praefect/server.go | 24 |
11 files changed, 145 insertions, 126 deletions
diff --git a/changelogs/unreleased/jc-zero-downtime-praefect.yml b/changelogs/unreleased/jc-zero-downtime-praefect.yml new file mode 100644 index 000000000..f8f922677 --- /dev/null +++ b/changelogs/unreleased/jc-zero-downtime-praefect.yml @@ -0,0 +1,5 @@ +--- +title: Leverage the bootstrap package to support Praefect zero downtime deploys +merge_request: 1638 +author: +type: other diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index b6b819b60..624c08747 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -7,6 +7,7 @@ import ( log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/bootstrap" + "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/config/sentry" "gitlab.com/gitlab-org/gitaly/internal/git" @@ -84,21 +85,21 @@ func main() { // Inside here we can use deferred functions. This is needed because // log.Fatal bypasses deferred functions. func run(b *bootstrap.Bootstrap) error { - servers := bootstrap.NewServerFactory() + servers := bootstrap.NewGitalyServerFactory() defer servers.Stop() b.StopAction = servers.GracefulStop - for _, c := range []starterConfig{ - {unix, config.Config.SocketPath}, - {tcp, config.Config.ListenAddr}, - {tls, config.Config.TLSListenAddr}, + for _, c := range []starter.Config{ + {starter.Unix, config.Config.SocketPath}, + {starter.TCP, config.Config.ListenAddr}, + {starter.TLS, config.Config.TLSListenAddr}, } { - if c.addr == "" { + if c.Addr == "" { continue } - b.RegisterStarter(gitalyStarter(c, servers)) + b.RegisterStarter(starter.New(c, servers)) } if addr := config.Config.PrometheusListenAddr; addr != "" { diff --git a/cmd/gitaly/starter_config.go b/cmd/gitaly/starter_config.go deleted file mode 100644 index 889855815..000000000 --- a/cmd/gitaly/starter_config.go +++ /dev/null @@ -1,45 +0,0 @@ -package main - -import ( - "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/internal/bootstrap" -) - -const ( - tcp string = "tcp" - tls string = "tls" - unix string = "unix" -) - -type starterConfig struct { - name, addr string -} - -func (s *starterConfig) isSecure() bool { - return s.name == tls -} - -func (s *starterConfig) family() string { - if s.isSecure() { - return tcp - } - - return s.name -} - -func gitalyStarter(cfg starterConfig, servers bootstrap.GracefulStoppableServer) bootstrap.Starter { - return func(listen bootstrap.ListenFunc, errCh chan<- error) error { - l, err := listen(cfg.family(), cfg.addr) - if err != nil { - return err - } - - logrus.WithField("address", cfg.addr).Infof("listening at %s address", cfg.name) - - go func() { - errCh <- servers.Serve(l, cfg.isSecure()) - }() - - return nil - } -} diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 271041e8c..8fb6c0c9c 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -5,13 +5,11 @@ import ( "errors" "flag" "fmt" - "net" "os" - "os/signal" "strings" - "syscall" - "time" + "gitlab.com/gitlab-org/gitaly/internal/bootstrap" + "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/internal/config/sentry" "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect" @@ -46,12 +44,12 @@ func main() { logger.Fatal(err) } - listeners, err := getListeners(conf.SocketPath, conf.ListenAddr) + starterConfigs, err := getStarterConfigs(conf.SocketPath, conf.ListenAddr) if err != nil { logger.Fatalf("%s", err) } - if err := run(listeners, conf); err != nil { + if err := run(starterConfigs, conf); err != nil { logger.Fatalf("%v", err) } } @@ -94,7 +92,7 @@ func configure() (config.Config, error) { return conf, nil } -func run(listeners []net.Listener, conf config.Config) error { +func run(cfgs []starter.Config, conf config.Config) error { clientConnections := conn.NewClientConnections() for _, virtualStorage := range conf.VirtualStorages { @@ -112,74 +110,63 @@ func run(listeners []net.Listener, conf config.Config) error { var ( // top level server dependencies - ds = datastore.NewInMemory(conf) - coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) - repl = praefect.NewReplMgr("default", logger, ds, clientConnections) - srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) - // signal related - signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} - termCh = make(chan os.Signal, len(signals)) + ds = datastore.NewInMemory(conf) + coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) + repl = praefect.NewReplMgr("default", logger, ds, clientConnections) + srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) serverErrors = make(chan error, 1) ) - signal.Notify(termCh, signals...) - - for _, l := range listeners { - go func(lis net.Listener) { serverErrors <- srv.Start(lis) }(l) - } - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { serverErrors <- repl.ProcessBacklog(ctx) }() + _, isWrapped := os.LookupEnv(config.EnvUpgradesEnabled) - go coordinator.FailoverRotation() + b, err := bootstrap.New(os.Getenv(config.EnvPidFile), isWrapped) + if err != nil { + return fmt.Errorf("unable to create a bootstrap: %v", err) + } - select { - case s := <-termCh: - logger.WithField("signal", s).Warn("received signal, shutting down gracefully") - cancel() // cancels the replicator job processing + srv.RegisterServices() - ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - if shutdownErr := srv.Shutdown(ctx); shutdownErr != nil { - logger.Warnf("error received during shutting down: %v", shutdownErr) - return shutdownErr - } - case err := <-serverErrors: - return err + b.StopAction = srv.GracefulStop + for _, cfg := range cfgs { + b.RegisterStarter(starter.New(cfg, srv)) } - return nil -} + if err := b.Start(); err != nil { + return fmt.Errorf("unable to start the bootstrap: %v", err) + } -func getListeners(socketPath, listenAddr string) ([]net.Listener, error) { - var listeners []net.Listener + go func() { serverErrors <- b.Wait() }() + go func() { serverErrors <- repl.ProcessBacklog(ctx) }() + + go coordinator.FailoverRotation() + + return <-serverErrors +} +func getStarterConfigs(socketPath, listenAddr string) ([]starter.Config, error) { + var cfgs []starter.Config if socketPath != "" { if err := os.RemoveAll(socketPath); err != nil { return nil, err } cleanPath := strings.TrimPrefix(socketPath, "unix:") - l, err := net.Listen("unix", cleanPath) - if err != nil { - return nil, err - } - listeners = append(listeners, l) + cfgs = append(cfgs, starter.Config{Name: starter.Unix, Addr: cleanPath}) logger.WithField("address", socketPath).Info("listening on unix socket") } if listenAddr != "" { - l, err := net.Listen("tcp", listenAddr) - if err != nil { - return nil, err - } + cleanAddr := strings.TrimPrefix(listenAddr, "tcp://") + + cfgs = append(cfgs, starter.Config{Name: starter.TCP, Addr: cleanAddr}) - listeners = append(listeners, l) logger.WithField("address", listenAddr).Info("listening at tcp address") } - return listeners, nil + return cfgs, nil } diff --git a/internal/bootstrap/server_factory.go b/internal/bootstrap/server_factory.go index bb26a59d4..ca6d870d1 100644 --- a/internal/bootstrap/server_factory.go +++ b/internal/bootstrap/server_factory.go @@ -9,7 +9,8 @@ import ( "google.golang.org/grpc" ) -type serverFactory struct { +// GitalyServerFactory is a factory of gitaly grpc servers +type GitalyServerFactory struct { ruby *rubyserver.Server secure, insecure *grpc.Server } @@ -19,17 +20,20 @@ type GracefulStoppableServer interface { GracefulStop() Stop() Serve(l net.Listener, secure bool) error - StartRuby() error } -// NewServerFactory initializes a rubyserver and then lazily initializes both secure and insecure grpc.Server -func NewServerFactory() GracefulStoppableServer { - return &serverFactory{ruby: &rubyserver.Server{}} +// NewGitalyServerFactory initializes a rubyserver and then lazily initializes both secure and insecure grpc.Server +func NewGitalyServerFactory() *GitalyServerFactory { + return &GitalyServerFactory{ruby: &rubyserver.Server{}} } -func (s *serverFactory) StartRuby() error { return s.ruby.Start() } +// StartRuby starts the ruby process +func (s *GitalyServerFactory) StartRuby() error { + return s.ruby.Start() +} -func (s *serverFactory) Stop() { +// Stop stops both the secure and insecure servers +func (s *GitalyServerFactory) Stop() { for _, srv := range s.all() { srv.Stop() } @@ -37,7 +41,8 @@ func (s *serverFactory) Stop() { s.ruby.Stop() } -func (s *serverFactory) GracefulStop() { +// GracefulStop stops both the secure and insecure servers gracefully +func (s *GitalyServerFactory) GracefulStop() { wg := sync.WaitGroup{} for _, srv := range s.all() { @@ -52,13 +57,14 @@ func (s *serverFactory) GracefulStop() { wg.Wait() } -func (s *serverFactory) Serve(l net.Listener, secure bool) error { +// Serve starts serving the listener +func (s *GitalyServerFactory) Serve(l net.Listener, secure bool) error { srv := s.get(secure) return srv.Serve(l) } -func (s *serverFactory) get(secure bool) *grpc.Server { +func (s *GitalyServerFactory) get(secure bool) *grpc.Server { if secure { if s.secure == nil { s.secure = server.NewSecure(s.ruby) @@ -74,7 +80,7 @@ func (s *serverFactory) get(secure bool) *grpc.Server { return s.insecure } -func (s *serverFactory) all() []*grpc.Server { +func (s *GitalyServerFactory) all() []*grpc.Server { var servers []*grpc.Server if s.secure != nil { servers = append(servers, s.secure) diff --git a/internal/bootstrap/starter/starter.go b/internal/bootstrap/starter/starter.go new file mode 100644 index 000000000..5751c033e --- /dev/null +++ b/internal/bootstrap/starter/starter.go @@ -0,0 +1,50 @@ +package starter + +import ( + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/bootstrap" +) + +const ( + // TCP is the prefix for tcp + TCP string = "tcp" + // TLS is the prefix for tls + TLS string = "tls" + // Unix is the prefix for unix + Unix string = "unix" +) + +// Config represents a network type, and address +type Config struct { + Name, Addr string +} + +func (c *Config) isSecure() bool { + return c.Name == TLS +} + +func (c *Config) family() string { + if c.isSecure() { + return TCP + } + + return c.Name +} + +// New creates a new bootstrap.Starter from a config and a GracefulStoppableServer +func New(cfg Config, servers bootstrap.GracefulStoppableServer) bootstrap.Starter { + return func(listen bootstrap.ListenFunc, errCh chan<- error) error { + l, err := listen(cfg.family(), cfg.Addr) + if err != nil { + return err + } + + logrus.WithField("address", cfg.Addr).Infof("listening at %s address", cfg.Name) + + go func() { + errCh <- servers.Serve(l, cfg.isSecure()) + }() + + return nil + } +} diff --git a/cmd/gitaly/starter_config_test.go b/internal/bootstrap/starter/starter_test.go index 377880f25..7a6a35158 100644 --- a/cmd/gitaly/starter_config_test.go +++ b/internal/bootstrap/starter/starter_test.go @@ -1,4 +1,4 @@ -package main +package starter import ( "testing" @@ -16,7 +16,7 @@ func TestIsSecure(t *testing.T) { {"tls", true}, } { t.Run(test.name, func(t *testing.T) { - conf := starterConfig{name: test.name} + conf := Config{Name: test.name} require.Equal(t, test.secure, conf.isSecure()) }) } @@ -31,7 +31,7 @@ func TestFamily(t *testing.T) { {"tls", "tcp"}, } { t.Run(test.name, func(t *testing.T) { - conf := starterConfig{name: test.name} + conf := Config{Name: test.name} require.Equal(t, test.family, conf.family()) }) } diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index ff510e460..f9130bfd1 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -203,7 +203,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func listener, err := net.Listen("unix", serverSocketPath) require.NoError(t, err) - go srv.Start(listener) + go srv.Serve(listener, false) return srv, "unix://" + serverSocketPath, cleanup } diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index b074c1f01..cdd4b3e81 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -13,6 +13,13 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) +const ( + // EnvPidFile is the name of the environment variable containing the pid file path + EnvPidFile = "PRAEFECT_PID_FILE" + // EnvUpgradesEnabled is an environment variable that when defined gitaly must enable graceful upgrades on SIGHUP + EnvUpgradesEnabled = "PRAEFECT_UPGRADES_ENABLED" +) + // Config is a container for everything found in the TOML config file type Config struct { ListenAddr string `toml:"listen_addr"` diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 4d11354bb..f0058be82 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -134,8 +134,9 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st errQ := make(chan error) + prf.RegisterServices() go func() { - errQ <- prf.Start(listener) + errQ <- prf.Serve(listener, false) }() // dial client to praefect @@ -200,7 +201,8 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client errQ := make(chan error) ctx, cancel := testhelper.Context() - go func() { errQ <- prf.Start(listener) }() + prf.RegisterServices() + go func() { errQ <- prf.Serve(listener, false) }() go func() { errQ <- replmgr.ProcessBacklog(ctx) }() // dial client to praefect diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 46ee62cea..ce9e0d0e4 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -108,17 +108,13 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { } } -// Start will start the praefect gRPC proxy server listening at the provided -// listener. Function will block until the server is stopped or an -// unrecoverable error occurs. -func (srv *Server) Start(lis net.Listener) error { - srv.registerServices() - - return srv.s.Serve(lis) +// Serve starts serving requests from the listener +func (srv *Server) Serve(l net.Listener, secure bool) error { + return srv.s.Serve(l) } -// registerServices will register any services praefect needs to handle rpcs on its own -func (srv *Server) registerServices() { +// RegisterServices will register any services praefect needs to handle rpcs on its own +func (srv *Server) RegisterServices() { // ServerServiceServer is necessary for the ServerInfo RPC gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.clientConnections)) @@ -143,3 +139,13 @@ func (srv *Server) Shutdown(ctx context.Context) error { return nil } } + +// GracefulStop stops the praefect server gracefully +func (srv *Server) GracefulStop() { + srv.s.GracefulStop() +} + +// Stop stops the praefect server +func (srv *Server) Stop() { + srv.s.Stop() +} |