Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-10-11 14:59:17 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-10-12 11:15:57 +0300
commitcab525cc34cce2062ecaafcd544d47906121c45b (patch)
tree192731a18e5c207c2314f5c5101989f4ee13a7e7
parentc29860242695db7fc10aa47ff9c38d0c47293beb (diff)
bootstrap: Abstract bootstrapper for testing
The old implementation of the bootstrapper initialization does not allow calling the 'run' function to start a service because the tableflip library doesn't support multiple instances to be created for one process. Starting the Praefect service is required in tests to verify sub-command execution. The bootstrapper initialization extracted out of 'run' function. It allows using a new Noop bootstrapper to run service without tableflip support.
-rw-r--r--cmd/gitaly/main.go4
-rw-r--r--cmd/praefect/main.go17
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go7
-rw-r--r--internal/bootstrap/bootstrap.go72
-rw-r--r--internal/bootstrap/bootstrap_test.go38
5 files changed, 95 insertions, 43 deletions
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index 6405abc95..e66d13682 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -180,8 +180,6 @@ func run(cfg config.Cfg) error {
return fmt.Errorf("linguist instance creation: %w", err)
}
- b.StopAction = gitalyServerFactory.GracefulStop
-
rubySrv := rubyserver.New(cfg)
if err := rubySrv.Start(); err != nil {
return fmt.Errorf("initialize gitaly-ruby: %v", err)
@@ -282,5 +280,5 @@ func run(cfg config.Cfg) error {
}
}()
- return b.Wait(cfg.GracefulRestartTimeout.Duration())
+ return b.Wait(cfg.GracefulRestartTimeout.Duration(), gitalyServerFactory.GracefulStop)
}
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index b2607bd18..58b3afd5f 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -167,7 +167,12 @@ func main() {
logger.Fatalf("%s", err)
}
- if err := run(starterConfigs, conf); err != nil {
+ b, err := bootstrap.New()
+ if err != nil {
+ logger.Fatalf("unable to create a bootstrap: %v", err)
+ }
+
+ if err := run(starterConfigs, conf, b); err != nil {
logger.Fatalf("%v", err)
}
}
@@ -206,7 +211,7 @@ func configure(conf config.Config) {
sentry.ConfigureSentry(version.GetVersion(), conf.Sentry)
}
-func run(cfgs []starter.Config, conf config.Config) error {
+func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener) error {
nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus)
if err != nil {
return err
@@ -412,12 +417,6 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
prometheus.MustRegister(metricsCollectors...)
- b, err := bootstrap.New()
- if err != nil {
- return fmt.Errorf("unable to create a bootstrap: %v", err)
- }
-
- b.StopAction = srvFactory.GracefulStop
for _, cfg := range cfgs {
srv, err := srvFactory.Create(cfg.IsSecure())
if err != nil {
@@ -500,7 +499,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
}
- return b.Wait(conf.GracefulStopTimeout.Duration())
+ return b.Wait(conf.GracefulStopTimeout.Duration(), srvFactory.GracefulStop)
}
func getStarterConfigs(conf config.Config) ([]starter.Config, error) {
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index e204b8419..11b7fd024 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -6,7 +6,6 @@ import (
"flag"
"path/filepath"
"strings"
- "syscall"
"testing"
"time"
@@ -15,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/bootstrap"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
@@ -102,9 +102,10 @@ func TestRemoveRepository_Exec(t *testing.T) {
starterConfigs, err := getStarterConfigs(conf)
require.NoError(t, err)
stopped := make(chan struct{})
+ bootstrapper := bootstrap.NewNoop()
go func() {
defer close(stopped)
- err := run(starterConfigs, conf)
+ err := run(starterConfigs, conf, bootstrapper)
assert.EqualError(t, err, `received signal "terminated"`)
}()
@@ -226,7 +227,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
requireNoDatabaseInfo(t, db, cmd)
})
- require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM))
+ bootstrapper.Terminate()
<-stopped
}
diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go
index 5f1f87e04..23877b795 100644
--- a/internal/bootstrap/bootstrap.go
+++ b/internal/bootstrap/bootstrap.go
@@ -21,11 +21,18 @@ const (
socketReusePortWarning = "Unable to set SO_REUSEPORT: zero downtime upgrades will not work"
)
+// Listener is an interface of the bootstrap manager.
+type Listener interface {
+ // RegisterStarter adds starter to the pool.
+ RegisterStarter(starter Starter)
+ // Start starts all registered starters to accept connections.
+ Start() error
+ // Wait terminates all registered starters.
+ Wait(gracefulTimeout time.Duration, stopAction func()) error
+}
+
// Bootstrap handles graceful upgrades
type Bootstrap struct {
- // StopAction will be invoked during a graceful stop. It must wait until the shutdown is completed
- StopAction func()
-
upgrader upgrader
listenFunc ListenFunc
errChan chan error
@@ -150,7 +157,8 @@ func (b *Bootstrap) Start() error {
// Wait will signal process readiness to the parent and than wait for an exit condition
// SIGTERM, SIGINT and a runtime error will trigger an immediate shutdown
// in case of an upgrade there will be a grace period to complete the ongoing requests
-func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
+// stopAction will be invoked during a graceful stop. It must wait until the shutdown is completed.
+func (b *Bootstrap) Wait(gracefulTimeout time.Duration, stopAction func()) error {
signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT}
immediateShutdown := make(chan os.Signal, len(signals))
signal.Notify(immediateShutdown, signals...)
@@ -166,7 +174,7 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
// the new process signaled its readiness and we started a graceful stop
// however no further upgrades can be started until this process is running
// we set a grace period and then we force a termination.
- waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown)
+ waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown, stopAction)
err = fmt.Errorf("graceful upgrade: %v", waitError)
case s := <-immediateShutdown:
@@ -177,13 +185,13 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
return err
}
-func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal) error {
+func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal, stopAction func()) error {
log.WithField("graceful_timeout", gracefulTimeout).Warn("starting grace period")
allServersDone := make(chan struct{})
go func() {
- if b.StopAction != nil {
- b.StopAction()
+ if stopAction != nil {
+ stopAction()
}
close(allServersDone)
}()
@@ -207,3 +215,51 @@ func (b *Bootstrap) listen(network, path string) (net.Listener, error) {
return b.listenFunc(network, path)
}
+
+// Noop is a bootstrapper that does no additional configurations.
+type Noop struct {
+ starters []Starter
+ shutdown chan struct{}
+ errChan chan error
+}
+
+// NewNoop returns initialized instance of the *Noop.
+func NewNoop() *Noop {
+ return &Noop{shutdown: make(chan struct{})}
+}
+
+// RegisterStarter adds starter to the pool.
+func (n *Noop) RegisterStarter(starter Starter) {
+ n.starters = append(n.starters, starter)
+}
+
+// Start starts all registered starters to accept connections.
+func (n *Noop) Start() error {
+ n.errChan = make(chan error, len(n.starters))
+
+ for _, start := range n.starters {
+ if err := start(net.Listen, n.errChan); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Wait terminates all registered starters.
+func (n *Noop) Wait(_ time.Duration, stopAction func()) error {
+ select {
+ case <-n.shutdown:
+ if stopAction != nil {
+ stopAction()
+ }
+ case err := <-n.errChan:
+ return err
+ }
+
+ return nil
+}
+
+// Terminate unblocks Wait method and executes stopAction call-back passed into it.
+func (n *Noop) Terminate() {
+ close(n.shutdown)
+}
diff --git a/internal/bootstrap/bootstrap_test.go b/internal/bootstrap/bootstrap_test.go
index dff68d09b..b37181932 100644
--- a/internal/bootstrap/bootstrap_test.go
+++ b/internal/bootstrap/bootstrap_test.go
@@ -104,10 +104,10 @@ func waitWithTimeout(t *testing.T, waitCh <-chan error, timeout time.Duration) e
}
func TestImmediateTerminationOnSocketError(t *testing.T) {
- b, server := makeBootstrap(t)
+ b, server, stopAction := makeBootstrap(t)
waitCh := make(chan error)
- go func() { waitCh <- b.Wait(2 * time.Second) }()
+ go func() { waitCh <- b.Wait(2*time.Second, stopAction) }()
require.NoError(t, server.listeners["tcp"].Close(), "Closing first listener")
@@ -119,12 +119,12 @@ func TestImmediateTerminationOnSocketError(t *testing.T) {
func TestImmediateTerminationOnSignal(t *testing.T) {
for _, sig := range []syscall.Signal{syscall.SIGTERM, syscall.SIGINT} {
t.Run(sig.String(), func(t *testing.T) {
- b, server := makeBootstrap(t)
+ b, server, stopAction := makeBootstrap(t)
done := server.slowRequest(3 * time.Minute)
waitCh := make(chan error)
- go func() { waitCh <- b.Wait(2 * time.Second) }()
+ go func() { waitCh <- b.Wait(2*time.Second, stopAction) }()
// make sure we are inside b.Wait() or we'll kill the test suite
time.Sleep(100 * time.Millisecond)
@@ -146,9 +146,9 @@ func TestImmediateTerminationOnSignal(t *testing.T) {
}
func TestGracefulTerminationStuck(t *testing.T) {
- b, server := makeBootstrap(t)
+ b, server, stopAction := makeBootstrap(t)
- err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil)
+ err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction)
require.Contains(t, err.Error(), "grace period expired")
}
@@ -158,22 +158,22 @@ func TestGracefulTerminationWithSignals(t *testing.T) {
for _, sig := range []syscall.Signal{syscall.SIGTERM, syscall.SIGINT} {
t.Run(sig.String(), func(t *testing.T) {
- b, server := makeBootstrap(t)
+ b, server, stopAction := makeBootstrap(t)
err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, func() {
require.NoError(t, self.Signal(sig))
- })
+ }, stopAction)
require.Contains(t, err.Error(), "force shutdown")
})
}
}
func TestGracefulTerminationServerErrors(t *testing.T) {
- b, server := makeBootstrap(t)
+ b, server, _ := makeBootstrap(t)
done := make(chan error, 1)
// This is a simulation of receiving a listener error during waitGracePeriod
- b.StopAction = func() {
+ stopAction := func() {
// we close the unix listener in order to test that the shutdown will not fail, but it keep waiting for the TCP request
require.NoError(t, server.listeners["unix"].Close())
@@ -185,19 +185,19 @@ func TestGracefulTerminationServerErrors(t *testing.T) {
require.NoError(t, server.server.Shutdown(context.Background()))
}
- err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil)
+ err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction)
require.Contains(t, err.Error(), "grace period expired")
require.NoError(t, <-done)
}
func TestGracefulTermination(t *testing.T) {
- b, server := makeBootstrap(t)
+ b, server, _ := makeBootstrap(t)
// Using server.Close we bypass the graceful shutdown faking a completed shutdown
- b.StopAction = func() { server.server.Close() }
+ stopAction := func() { server.server.Close() }
- err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil)
+ err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil, stopAction)
require.Contains(t, err.Error(), "completed")
}
@@ -219,9 +219,9 @@ func TestPortReuse(t *testing.T) {
require.NoError(t, l.Close())
}
-func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func()) error {
+func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback, stopAction func()) error {
waitCh := make(chan error)
- go func() { waitCh <- b.Wait(gracefulWait) }()
+ go func() { waitCh <- b.Wait(gracefulWait, stopAction) }()
// Start a slow request to keep the old server from shutting down immediately.
req := server.slowRequest(2 * gracefulWait)
@@ -251,7 +251,7 @@ func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTime
return waitErr
}
-func makeBootstrap(t *testing.T) (*Bootstrap, *testServer) {
+func makeBootstrap(t *testing.T) (*Bootstrap, *testServer, func()) {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
@@ -271,8 +271,6 @@ func makeBootstrap(t *testing.T) (*Bootstrap, *testServer) {
b, err := _new(u, net.Listen, false)
require.NoError(t, err)
- b.StopAction = func() { require.NoError(t, s.Shutdown(context.Background())) }
-
listeners := make(map[string]net.Listener)
start := func(network, address string) Starter {
return func(listen ListenFunc, errors chan<- error) error {
@@ -312,7 +310,7 @@ func makeBootstrap(t *testing.T) (*Bootstrap, *testServer) {
server: &s,
listeners: listeners,
url: url,
- }
+ }, func() { require.NoError(t, s.Shutdown(context.Background())) }
}
func testAllListeners(t *testing.T, listeners map[string]net.Listener) {