Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-11-27 04:57:09 +0300
committerJohn Cai <jcai@gitlab.com>2019-11-27 04:57:09 +0300
commit1332007de9a21540582ed752dd664382369eaca9 (patch)
treee3e158da21a79a4ca2b4feeade654505d326785a
parent82d31745e0901dc91c2fe02f44ff8ff659a22c09 (diff)
parent7c4f16371b8f051f1eb3a7e989d26ea7eb962a8e (diff)
Merge branch 'jc-zero-downtime-praefect' into 'master'
Wire in bootstrap package for praefect for zero downtime deploy Closes #1898 See merge request gitlab-org/gitaly!1638
-rw-r--r--changelogs/unreleased/jc-zero-downtime-praefect.yml5
-rw-r--r--cmd/gitaly/main.go15
-rw-r--r--cmd/gitaly/starter_config.go45
-rw-r--r--cmd/praefect/main.go83
-rw-r--r--internal/bootstrap/server_factory.go28
-rw-r--r--internal/bootstrap/starter/starter.go50
-rw-r--r--internal/bootstrap/starter/starter_test.go (renamed from cmd/gitaly/starter_config_test.go)6
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/config/config.go7
-rw-r--r--internal/praefect/helper_test.go6
-rw-r--r--internal/praefect/server.go24
11 files changed, 145 insertions, 126 deletions
diff --git a/changelogs/unreleased/jc-zero-downtime-praefect.yml b/changelogs/unreleased/jc-zero-downtime-praefect.yml
new file mode 100644
index 000000000..f8f922677
--- /dev/null
+++ b/changelogs/unreleased/jc-zero-downtime-praefect.yml
@@ -0,0 +1,5 @@
+---
+title: Leverage the bootstrap package to support Praefect zero downtime deploys
+merge_request: 1638
+author:
+type: other
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index b6b819b60..624c08747 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -7,6 +7,7 @@ import (
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/bootstrap"
+ "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter"
"gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/config/sentry"
"gitlab.com/gitlab-org/gitaly/internal/git"
@@ -84,21 +85,21 @@ func main() {
// Inside here we can use deferred functions. This is needed because
// log.Fatal bypasses deferred functions.
func run(b *bootstrap.Bootstrap) error {
- servers := bootstrap.NewServerFactory()
+ servers := bootstrap.NewGitalyServerFactory()
defer servers.Stop()
b.StopAction = servers.GracefulStop
- for _, c := range []starterConfig{
- {unix, config.Config.SocketPath},
- {tcp, config.Config.ListenAddr},
- {tls, config.Config.TLSListenAddr},
+ for _, c := range []starter.Config{
+ {starter.Unix, config.Config.SocketPath},
+ {starter.TCP, config.Config.ListenAddr},
+ {starter.TLS, config.Config.TLSListenAddr},
} {
- if c.addr == "" {
+ if c.Addr == "" {
continue
}
- b.RegisterStarter(gitalyStarter(c, servers))
+ b.RegisterStarter(starter.New(c, servers))
}
if addr := config.Config.PrometheusListenAddr; addr != "" {
diff --git a/cmd/gitaly/starter_config.go b/cmd/gitaly/starter_config.go
deleted file mode 100644
index 889855815..000000000
--- a/cmd/gitaly/starter_config.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package main
-
-import (
- "github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/internal/bootstrap"
-)
-
-const (
- tcp string = "tcp"
- tls string = "tls"
- unix string = "unix"
-)
-
-type starterConfig struct {
- name, addr string
-}
-
-func (s *starterConfig) isSecure() bool {
- return s.name == tls
-}
-
-func (s *starterConfig) family() string {
- if s.isSecure() {
- return tcp
- }
-
- return s.name
-}
-
-func gitalyStarter(cfg starterConfig, servers bootstrap.GracefulStoppableServer) bootstrap.Starter {
- return func(listen bootstrap.ListenFunc, errCh chan<- error) error {
- l, err := listen(cfg.family(), cfg.addr)
- if err != nil {
- return err
- }
-
- logrus.WithField("address", cfg.addr).Infof("listening at %s address", cfg.name)
-
- go func() {
- errCh <- servers.Serve(l, cfg.isSecure())
- }()
-
- return nil
- }
-}
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 271041e8c..8fb6c0c9c 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -5,13 +5,11 @@ import (
"errors"
"flag"
"fmt"
- "net"
"os"
- "os/signal"
"strings"
- "syscall"
- "time"
+ "gitlab.com/gitlab-org/gitaly/internal/bootstrap"
+ "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter"
"gitlab.com/gitlab-org/gitaly/internal/config/sentry"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
@@ -46,12 +44,12 @@ func main() {
logger.Fatal(err)
}
- listeners, err := getListeners(conf.SocketPath, conf.ListenAddr)
+ starterConfigs, err := getStarterConfigs(conf.SocketPath, conf.ListenAddr)
if err != nil {
logger.Fatalf("%s", err)
}
- if err := run(listeners, conf); err != nil {
+ if err := run(starterConfigs, conf); err != nil {
logger.Fatalf("%v", err)
}
}
@@ -94,7 +92,7 @@ func configure() (config.Config, error) {
return conf, nil
}
-func run(listeners []net.Listener, conf config.Config) error {
+func run(cfgs []starter.Config, conf config.Config) error {
clientConnections := conn.NewClientConnections()
for _, virtualStorage := range conf.VirtualStorages {
@@ -112,74 +110,63 @@ func run(listeners []net.Listener, conf config.Config) error {
var (
// top level server dependencies
- ds = datastore.NewInMemory(conf)
- coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...)
- repl = praefect.NewReplMgr("default", logger, ds, clientConnections)
- srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf)
- // signal related
- signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}
- termCh = make(chan os.Signal, len(signals))
+ ds = datastore.NewInMemory(conf)
+ coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...)
+ repl = praefect.NewReplMgr("default", logger, ds, clientConnections)
+ srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf)
serverErrors = make(chan error, 1)
)
- signal.Notify(termCh, signals...)
-
- for _, l := range listeners {
- go func(lis net.Listener) { serverErrors <- srv.Start(lis) }(l)
- }
-
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
+ _, isWrapped := os.LookupEnv(config.EnvUpgradesEnabled)
- go coordinator.FailoverRotation()
+ b, err := bootstrap.New(os.Getenv(config.EnvPidFile), isWrapped)
+ if err != nil {
+ return fmt.Errorf("unable to create a bootstrap: %v", err)
+ }
- select {
- case s := <-termCh:
- logger.WithField("signal", s).Warn("received signal, shutting down gracefully")
- cancel() // cancels the replicator job processing
+ srv.RegisterServices()
- ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
- if shutdownErr := srv.Shutdown(ctx); shutdownErr != nil {
- logger.Warnf("error received during shutting down: %v", shutdownErr)
- return shutdownErr
- }
- case err := <-serverErrors:
- return err
+ b.StopAction = srv.GracefulStop
+ for _, cfg := range cfgs {
+ b.RegisterStarter(starter.New(cfg, srv))
}
- return nil
-}
+ if err := b.Start(); err != nil {
+ return fmt.Errorf("unable to start the bootstrap: %v", err)
+ }
-func getListeners(socketPath, listenAddr string) ([]net.Listener, error) {
- var listeners []net.Listener
+ go func() { serverErrors <- b.Wait() }()
+ go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
+
+ go coordinator.FailoverRotation()
+
+ return <-serverErrors
+}
+func getStarterConfigs(socketPath, listenAddr string) ([]starter.Config, error) {
+ var cfgs []starter.Config
if socketPath != "" {
if err := os.RemoveAll(socketPath); err != nil {
return nil, err
}
cleanPath := strings.TrimPrefix(socketPath, "unix:")
- l, err := net.Listen("unix", cleanPath)
- if err != nil {
- return nil, err
- }
- listeners = append(listeners, l)
+ cfgs = append(cfgs, starter.Config{Name: starter.Unix, Addr: cleanPath})
logger.WithField("address", socketPath).Info("listening on unix socket")
}
if listenAddr != "" {
- l, err := net.Listen("tcp", listenAddr)
- if err != nil {
- return nil, err
- }
+ cleanAddr := strings.TrimPrefix(listenAddr, "tcp://")
+
+ cfgs = append(cfgs, starter.Config{Name: starter.TCP, Addr: cleanAddr})
- listeners = append(listeners, l)
logger.WithField("address", listenAddr).Info("listening at tcp address")
}
- return listeners, nil
+ return cfgs, nil
}
diff --git a/internal/bootstrap/server_factory.go b/internal/bootstrap/server_factory.go
index bb26a59d4..ca6d870d1 100644
--- a/internal/bootstrap/server_factory.go
+++ b/internal/bootstrap/server_factory.go
@@ -9,7 +9,8 @@ import (
"google.golang.org/grpc"
)
-type serverFactory struct {
+// GitalyServerFactory is a factory of gitaly grpc servers
+type GitalyServerFactory struct {
ruby *rubyserver.Server
secure, insecure *grpc.Server
}
@@ -19,17 +20,20 @@ type GracefulStoppableServer interface {
GracefulStop()
Stop()
Serve(l net.Listener, secure bool) error
- StartRuby() error
}
-// NewServerFactory initializes a rubyserver and then lazily initializes both secure and insecure grpc.Server
-func NewServerFactory() GracefulStoppableServer {
- return &serverFactory{ruby: &rubyserver.Server{}}
+// NewGitalyServerFactory initializes a rubyserver and then lazily initializes both secure and insecure grpc.Server
+func NewGitalyServerFactory() *GitalyServerFactory {
+ return &GitalyServerFactory{ruby: &rubyserver.Server{}}
}
-func (s *serverFactory) StartRuby() error { return s.ruby.Start() }
+// StartRuby starts the ruby process
+func (s *GitalyServerFactory) StartRuby() error {
+ return s.ruby.Start()
+}
-func (s *serverFactory) Stop() {
+// Stop stops both the secure and insecure servers
+func (s *GitalyServerFactory) Stop() {
for _, srv := range s.all() {
srv.Stop()
}
@@ -37,7 +41,8 @@ func (s *serverFactory) Stop() {
s.ruby.Stop()
}
-func (s *serverFactory) GracefulStop() {
+// GracefulStop stops both the secure and insecure servers gracefully
+func (s *GitalyServerFactory) GracefulStop() {
wg := sync.WaitGroup{}
for _, srv := range s.all() {
@@ -52,13 +57,14 @@ func (s *serverFactory) GracefulStop() {
wg.Wait()
}
-func (s *serverFactory) Serve(l net.Listener, secure bool) error {
+// Serve starts serving the listener
+func (s *GitalyServerFactory) Serve(l net.Listener, secure bool) error {
srv := s.get(secure)
return srv.Serve(l)
}
-func (s *serverFactory) get(secure bool) *grpc.Server {
+func (s *GitalyServerFactory) get(secure bool) *grpc.Server {
if secure {
if s.secure == nil {
s.secure = server.NewSecure(s.ruby)
@@ -74,7 +80,7 @@ func (s *serverFactory) get(secure bool) *grpc.Server {
return s.insecure
}
-func (s *serverFactory) all() []*grpc.Server {
+func (s *GitalyServerFactory) all() []*grpc.Server {
var servers []*grpc.Server
if s.secure != nil {
servers = append(servers, s.secure)
diff --git a/internal/bootstrap/starter/starter.go b/internal/bootstrap/starter/starter.go
new file mode 100644
index 000000000..5751c033e
--- /dev/null
+++ b/internal/bootstrap/starter/starter.go
@@ -0,0 +1,50 @@
+package starter
+
+import (
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/bootstrap"
+)
+
+const (
+ // TCP is the prefix for tcp
+ TCP string = "tcp"
+ // TLS is the prefix for tls
+ TLS string = "tls"
+ // Unix is the prefix for unix
+ Unix string = "unix"
+)
+
+// Config represents a network type, and address
+type Config struct {
+ Name, Addr string
+}
+
+func (c *Config) isSecure() bool {
+ return c.Name == TLS
+}
+
+func (c *Config) family() string {
+ if c.isSecure() {
+ return TCP
+ }
+
+ return c.Name
+}
+
+// New creates a new bootstrap.Starter from a config and a GracefulStoppableServer
+func New(cfg Config, servers bootstrap.GracefulStoppableServer) bootstrap.Starter {
+ return func(listen bootstrap.ListenFunc, errCh chan<- error) error {
+ l, err := listen(cfg.family(), cfg.Addr)
+ if err != nil {
+ return err
+ }
+
+ logrus.WithField("address", cfg.Addr).Infof("listening at %s address", cfg.Name)
+
+ go func() {
+ errCh <- servers.Serve(l, cfg.isSecure())
+ }()
+
+ return nil
+ }
+}
diff --git a/cmd/gitaly/starter_config_test.go b/internal/bootstrap/starter/starter_test.go
index 377880f25..7a6a35158 100644
--- a/cmd/gitaly/starter_config_test.go
+++ b/internal/bootstrap/starter/starter_test.go
@@ -1,4 +1,4 @@
-package main
+package starter
import (
"testing"
@@ -16,7 +16,7 @@ func TestIsSecure(t *testing.T) {
{"tls", true},
} {
t.Run(test.name, func(t *testing.T) {
- conf := starterConfig{name: test.name}
+ conf := Config{Name: test.name}
require.Equal(t, test.secure, conf.isSecure())
})
}
@@ -31,7 +31,7 @@ func TestFamily(t *testing.T) {
{"tls", "tcp"},
} {
t.Run(test.name, func(t *testing.T) {
- conf := starterConfig{name: test.name}
+ conf := Config{Name: test.name}
require.Equal(t, test.family, conf.family())
})
}
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index ff510e460..f9130bfd1 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -203,7 +203,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
listener, err := net.Listen("unix", serverSocketPath)
require.NoError(t, err)
- go srv.Start(listener)
+ go srv.Serve(listener, false)
return srv, "unix://" + serverSocketPath, cleanup
}
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index b074c1f01..cdd4b3e81 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -13,6 +13,13 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
+const (
+ // EnvPidFile is the name of the environment variable containing the pid file path
+ EnvPidFile = "PRAEFECT_PID_FILE"
+ // EnvUpgradesEnabled is an environment variable that when defined gitaly must enable graceful upgrades on SIGHUP
+ EnvUpgradesEnabled = "PRAEFECT_UPGRADES_ENABLED"
+)
+
// Config is a container for everything found in the TOML config file
type Config struct {
ListenAddr string `toml:"listen_addr"`
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 4d11354bb..f0058be82 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -134,8 +134,9 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
errQ := make(chan error)
+ prf.RegisterServices()
go func() {
- errQ <- prf.Start(listener)
+ errQ <- prf.Serve(listener, false)
}()
// dial client to praefect
@@ -200,7 +201,8 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
errQ := make(chan error)
ctx, cancel := testhelper.Context()
- go func() { errQ <- prf.Start(listener) }()
+ prf.RegisterServices()
+ go func() { errQ <- prf.Serve(listener, false) }()
go func() { errQ <- replmgr.ProcessBacklog(ctx) }()
// dial client to praefect
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 46ee62cea..ce9e0d0e4 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -108,17 +108,13 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
}
}
-// Start will start the praefect gRPC proxy server listening at the provided
-// listener. Function will block until the server is stopped or an
-// unrecoverable error occurs.
-func (srv *Server) Start(lis net.Listener) error {
- srv.registerServices()
-
- return srv.s.Serve(lis)
+// Serve starts serving requests from the listener
+func (srv *Server) Serve(l net.Listener, secure bool) error {
+ return srv.s.Serve(l)
}
-// registerServices will register any services praefect needs to handle rpcs on its own
-func (srv *Server) registerServices() {
+// RegisterServices will register any services praefect needs to handle rpcs on its own
+func (srv *Server) RegisterServices() {
// ServerServiceServer is necessary for the ServerInfo RPC
gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.clientConnections))
@@ -143,3 +139,13 @@ func (srv *Server) Shutdown(ctx context.Context) error {
return nil
}
}
+
+// GracefulStop stops the praefect server gracefully
+func (srv *Server) GracefulStop() {
+ srv.s.GracefulStop()
+}
+
+// Stop stops the praefect server
+func (srv *Server) Stop() {
+ srv.s.Stop()
+}