diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-04 21:21:11 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-08 10:46:19 +0300 |
commit | 18ff3676f30667f70c8435903a4c9832a1c5e594 (patch) | |
tree | 9930a03eee4de834a92c21dab222347d9b9d260b | |
parent | 171eb484b55b5450fa76956bb0cd44a169e4a664 (diff) |
bootstrap: Abstract bootstrapper for testing
The old implementation of the bootstrapper initialization
does not allow calling the 'run' function to start a service
because the tableflip library doesn't support multiple
instances to be created for one process.
Starting the Praefect service is required in tests to verify
sub-command execution. The bootstrapper initialization
extracted out of 'run' function. It allows using a new
Noop bootstrapper to run service without tableflip
support.
-rw-r--r-- | cmd/gitaly/main.go | 4 | ||||
-rw-r--r-- | cmd/praefect/main.go | 17 | ||||
-rw-r--r-- | cmd/praefect/subcmd_list_untracked_repositories_test.go | 8 | ||||
-rw-r--r-- | cmd/praefect/subcmd_remove_repository_test.go | 8 | ||||
-rw-r--r-- | internal/bootstrap/bootstrap.go | 72 | ||||
-rw-r--r-- | internal/bootstrap/bootstrap_test.go | 38 |
6 files changed, 99 insertions, 48 deletions
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 3b901c45c..db7c73a06 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -199,8 +199,6 @@ func run(cfg config.Cfg) error { return fmt.Errorf("linguist instance creation: %w", err) } - b.StopAction = gitalyServerFactory.GracefulStop - rubySrv := rubyserver.New(cfg) if err := rubySrv.Start(); err != nil { return fmt.Errorf("initialize gitaly-ruby: %v", err) @@ -303,5 +301,5 @@ func run(cfg config.Cfg) error { } }() - return b.Wait(cfg.GracefulRestartTimeout.Duration()) + return b.Wait(cfg.GracefulRestartTimeout.Duration(), gitalyServerFactory.GracefulStop) } diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 44a0981f0..128f30cea 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -144,7 +144,12 @@ func main() { logger.Fatalf("%s", err) } - if err := run(starterConfigs, conf); err != nil { + b, err := bootstrap.New() + if err != nil { + logger.Fatalf("unable to create a bootstrap: %v", err) + } + + if err := run(starterConfigs, conf, b); err != nil { logger.Fatalf("%v", err) } } @@ -187,7 +192,7 @@ func configure(conf config.Config) { sentry.ConfigureSentry(version.GetVersion(), conf.Sentry) } -func run(cfgs []starter.Config, conf config.Config) error { +func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener) error { nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus) if err != nil { return err @@ -391,12 +396,6 @@ func run(cfgs []starter.Config, conf config.Config) error { } prometheus.MustRegister(metricsCollectors...) - b, err := bootstrap.New() - if err != nil { - return fmt.Errorf("unable to create a bootstrap: %v", err) - } - - b.StopAction = srvFactory.GracefulStop for _, cfg := range cfgs { srv, err := srvFactory.Create(cfg.IsSecure()) if err != nil { @@ -476,7 +475,7 @@ func run(cfgs []starter.Config, conf config.Config) error { logger.Warn(`Repository cleanup background task disabled as "repositories_cleanup.run_interval" is not set or 0.`) } - return b.Wait(conf.GracefulStopTimeout.Duration()) + return b.Wait(conf.GracefulStopTimeout.Duration(), srvFactory.GracefulStop) } func getStarterConfigs(conf config.Config) ([]starter.Config, error) { diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go index 561dc1457..3184588a3 100644 --- a/cmd/praefect/subcmd_list_untracked_repositories_test.go +++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go @@ -6,13 +6,13 @@ import ( "flag" "fmt" "strings" - "syscall" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" @@ -85,10 +85,10 @@ func TestListUntrackedRepositories_Exec(t *testing.T) { starterConfigs, err := getStarterConfigs(conf) require.NoError(t, err) stopped := make(chan struct{}) + bootstrapper := bootstrap.NewNoop() go func() { defer close(stopped) - err := run(starterConfigs, conf) - assert.EqualError(t, err, `received signal "terminated"`) + assert.NoError(t, run(starterConfigs, conf, bootstrapper)) }() cc, err := client.Dial("unix://"+conf.SocketPath, nil) @@ -115,7 +115,7 @@ func TestListUntrackedRepositories_Exec(t *testing.T) { } require.ElementsMatch(t, exp, strings.Split(out.String(), "\n")) - require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM)) + bootstrapper.Terminate() <-stopped } diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go index aa18bbdce..2305b947c 100644 --- a/cmd/praefect/subcmd_remove_repository_test.go +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -4,7 +4,6 @@ import ( "flag" "path/filepath" "strings" - "syscall" "testing" "time" @@ -13,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" @@ -108,10 +108,10 @@ func TestRemoveRepository_Exec(t *testing.T) { starterConfigs, err := getStarterConfigs(conf) require.NoError(t, err) stopped := make(chan struct{}) + bootstrapper := bootstrap.NewNoop() go func() { defer close(stopped) - err := run(starterConfigs, conf) - assert.EqualError(t, err, `received signal "terminated"`) + assert.NoError(t, run(starterConfigs, conf, bootstrapper)) }() cc, err := client.Dial("unix://"+conf.SocketPath, nil) @@ -210,7 +210,7 @@ func TestRemoveRepository_Exec(t *testing.T) { requireNoDatabaseInfo(t, db, cmd) }) - require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM)) + bootstrapper.Terminate() <-stopped } diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index 95e43aa10..f1d355926 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -22,11 +22,18 @@ const ( socketReusePortWarning = "Unable to set SO_REUSEPORT: zero downtime upgrades will not work" ) +// Listener is an interface of the bootstrap manager. +type Listener interface { + // RegisterStarter adds starter to the pool. + RegisterStarter(starter Starter) + // Start starts all registered starters to accept connections. + Start() error + // Wait terminates all registered starters. + Wait(gracefulTimeout time.Duration, stopAction func()) error +} + // Bootstrap handles graceful upgrades type Bootstrap struct { - // StopAction will be invoked during a graceful stop. It must wait until the shutdown is completed - StopAction func() - upgrader upgrader listenFunc ListenFunc errChan chan error @@ -152,7 +159,8 @@ func (b *Bootstrap) Start() error { // Wait will signal process readiness to the parent and than wait for an exit condition // SIGTERM, SIGINT and a runtime error will trigger an immediate shutdown // in case of an upgrade there will be a grace period to complete the ongoing requests -func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error { +// stopAction will be invoked during a graceful stop. It must wait until the shutdown is completed. +func (b *Bootstrap) Wait(gracefulTimeout time.Duration, stopAction func()) error { signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT} immediateShutdown := make(chan os.Signal, len(signals)) signal.Notify(immediateShutdown, signals...) @@ -168,7 +176,7 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error { // the new process signaled its readiness and we started a graceful stop // however no further upgrades can be started until this process is running // we set a grace period and then we force a termination. - waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown) + waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown, stopAction) err = fmt.Errorf("graceful upgrade: %v", waitError) case s := <-immediateShutdown: @@ -180,13 +188,13 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error { return err } -func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal) error { +func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal, stopAction func()) error { log.WithField("graceful_timeout", gracefulTimeout).Warn("starting grace period") allServersDone := make(chan struct{}) go func() { - if b.StopAction != nil { - b.StopAction() + if stopAction != nil { + stopAction() } close(allServersDone) }() @@ -210,3 +218,51 @@ func (b *Bootstrap) listen(network, path string) (net.Listener, error) { return b.listenFunc(network, path) } + +// Noop is a bootstrapper that does no additional configurations. +type Noop struct { + starters []Starter + shutdown chan struct{} + errChan chan error +} + +// NewNoop returns initialized instance of the *Noop. +func NewNoop() *Noop { + return &Noop{shutdown: make(chan struct{})} +} + +// RegisterStarter adds starter to the pool. +func (n *Noop) RegisterStarter(starter Starter) { + n.starters = append(n.starters, starter) +} + +// Start starts all registered starters to accept connections. +func (n *Noop) Start() error { + n.errChan = make(chan error, len(n.starters)) + + for _, start := range n.starters { + if err := start(net.Listen, n.errChan); err != nil { + return err + } + } + return nil +} + +// Wait terminates all registered starters. +func (n *Noop) Wait(_ time.Duration, stopAction func()) error { + select { + case <-n.shutdown: + if stopAction != nil { + stopAction() + } + case err := <-n.errChan: + return err + } + + return nil +} + +// Terminate unblocks Wait method and executes stopAction call-back passed into it. +func (n *Noop) Terminate() { + close(n.shutdown) +} diff --git a/internal/bootstrap/bootstrap_test.go b/internal/bootstrap/bootstrap_test.go index 1fb6c3485..83e0d2b5e 100644 --- a/internal/bootstrap/bootstrap_test.go +++ b/internal/bootstrap/bootstrap_test.go @@ -109,10 +109,10 @@ func TestImmediateTerminationOnSocketError(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - b, server := makeBootstrap(t, ctx) + b, server, stopAction := makeBootstrap(t, ctx) waitCh := make(chan error) - go func() { waitCh <- b.Wait(2 * time.Second) }() + go func() { waitCh <- b.Wait(2*time.Second, stopAction) }() require.NoError(t, server.listeners["tcp"].Close(), "Closing first listener") @@ -127,12 +127,12 @@ func TestImmediateTerminationOnSignal(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - b, server := makeBootstrap(t, ctx) + b, server, stopAction := makeBootstrap(t, ctx) done := server.slowRequest(3 * time.Minute) waitCh := make(chan error) - go func() { waitCh <- b.Wait(2 * time.Second) }() + go func() { waitCh <- b.Wait(2*time.Second, stopAction) }() // make sure we are inside b.Wait() or we'll kill the test suite time.Sleep(100 * time.Millisecond) @@ -157,9 +157,9 @@ func TestGracefulTerminationStuck(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - b, server := makeBootstrap(t, ctx) + b, server, stopAction := makeBootstrap(t, ctx) - err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil) + err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction) require.Contains(t, err.Error(), "grace period expired") } @@ -171,11 +171,11 @@ func TestGracefulTerminationWithSignals(t *testing.T) { t.Run(sig.String(), func(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - b, server := makeBootstrap(t, ctx) + b, server, stopAction := makeBootstrap(t, ctx) err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, func() { require.NoError(t, self.Signal(sig)) - }) + }, stopAction) require.Contains(t, err.Error(), "force shutdown") }) } @@ -184,11 +184,11 @@ func TestGracefulTerminationWithSignals(t *testing.T) { func TestGracefulTerminationServerErrors(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - b, server := makeBootstrap(t, ctx) + b, server, _ := makeBootstrap(t, ctx) done := make(chan error, 1) // This is a simulation of receiving a listener error during waitGracePeriod - b.StopAction = func() { + stopAction := func() { // we close the unix listener in order to test that the shutdown will not fail, but it keep waiting for the TCP request require.NoError(t, server.listeners["unix"].Close()) @@ -200,7 +200,7 @@ func TestGracefulTerminationServerErrors(t *testing.T) { require.NoError(t, server.server.Shutdown(context.Background())) } - err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil) + err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction) require.Contains(t, err.Error(), "grace period expired") require.NoError(t, <-done) @@ -209,12 +209,12 @@ func TestGracefulTerminationServerErrors(t *testing.T) { func TestGracefulTermination(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - b, server := makeBootstrap(t, ctx) + b, server, _ := makeBootstrap(t, ctx) // Using server.Close we bypass the graceful shutdown faking a completed shutdown - b.StopAction = func() { server.server.Close() } + stopAction := func() { server.server.Close() } - err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil) + err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil, stopAction) require.Contains(t, err.Error(), "completed") } @@ -236,9 +236,9 @@ func TestPortReuse(t *testing.T) { b.upgrader.Stop() } -func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func()) error { +func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func(), stopAction func()) error { waitCh := make(chan error) - go func() { waitCh <- b.Wait(gracefulWait) }() + go func() { waitCh <- b.Wait(gracefulWait, stopAction) }() // Start a slow request to keep the old server from shutting down immediately. req := server.slowRequest(2 * gracefulWait) @@ -268,7 +268,7 @@ func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTime return waitErr } -func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer) { +func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer, func()) { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(200) @@ -292,8 +292,6 @@ func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer) b, err := _new(u, net.Listen, false) require.NoError(t, err) - b.StopAction = func() { require.NoError(t, s.Shutdown(context.Background())) } - listeners := make(map[string]net.Listener) start := func(network, address string) Starter { return func(listen ListenFunc, errors chan<- error) error { @@ -333,7 +331,7 @@ func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer) server: &s, listeners: listeners, url: url, - } + }, func() { require.NoError(t, s.Shutdown(context.Background())) } } func testAllListeners(t *testing.T, listeners map[string]net.Listener) { |