Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-10-04 21:21:11 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-10-08 10:46:19 +0300
commit18ff3676f30667f70c8435903a4c9832a1c5e594 (patch)
tree9930a03eee4de834a92c21dab222347d9b9d260b
parent171eb484b55b5450fa76956bb0cd44a169e4a664 (diff)
bootstrap: Abstract bootstrapper for testing
The old implementation of the bootstrapper initialization does not allow calling the 'run' function to start a service because the tableflip library doesn't support multiple instances to be created for one process. Starting the Praefect service is required in tests to verify sub-command execution. The bootstrapper initialization extracted out of 'run' function. It allows using a new Noop bootstrapper to run service without tableflip support.
-rw-r--r--cmd/gitaly/main.go4
-rw-r--r--cmd/praefect/main.go17
-rw-r--r--cmd/praefect/subcmd_list_untracked_repositories_test.go8
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go8
-rw-r--r--internal/bootstrap/bootstrap.go72
-rw-r--r--internal/bootstrap/bootstrap_test.go38
6 files changed, 99 insertions, 48 deletions
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index 3b901c45c..db7c73a06 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -199,8 +199,6 @@ func run(cfg config.Cfg) error {
return fmt.Errorf("linguist instance creation: %w", err)
}
- b.StopAction = gitalyServerFactory.GracefulStop
-
rubySrv := rubyserver.New(cfg)
if err := rubySrv.Start(); err != nil {
return fmt.Errorf("initialize gitaly-ruby: %v", err)
@@ -303,5 +301,5 @@ func run(cfg config.Cfg) error {
}
}()
- return b.Wait(cfg.GracefulRestartTimeout.Duration())
+ return b.Wait(cfg.GracefulRestartTimeout.Duration(), gitalyServerFactory.GracefulStop)
}
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 44a0981f0..128f30cea 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -144,7 +144,12 @@ func main() {
logger.Fatalf("%s", err)
}
- if err := run(starterConfigs, conf); err != nil {
+ b, err := bootstrap.New()
+ if err != nil {
+ logger.Fatalf("unable to create a bootstrap: %v", err)
+ }
+
+ if err := run(starterConfigs, conf, b); err != nil {
logger.Fatalf("%v", err)
}
}
@@ -187,7 +192,7 @@ func configure(conf config.Config) {
sentry.ConfigureSentry(version.GetVersion(), conf.Sentry)
}
-func run(cfgs []starter.Config, conf config.Config) error {
+func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener) error {
nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus)
if err != nil {
return err
@@ -391,12 +396,6 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
prometheus.MustRegister(metricsCollectors...)
- b, err := bootstrap.New()
- if err != nil {
- return fmt.Errorf("unable to create a bootstrap: %v", err)
- }
-
- b.StopAction = srvFactory.GracefulStop
for _, cfg := range cfgs {
srv, err := srvFactory.Create(cfg.IsSecure())
if err != nil {
@@ -476,7 +475,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
logger.Warn(`Repository cleanup background task disabled as "repositories_cleanup.run_interval" is not set or 0.`)
}
- return b.Wait(conf.GracefulStopTimeout.Duration())
+ return b.Wait(conf.GracefulStopTimeout.Duration(), srvFactory.GracefulStop)
}
func getStarterConfigs(conf config.Config) ([]starter.Config, error) {
diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go
index 561dc1457..3184588a3 100644
--- a/cmd/praefect/subcmd_list_untracked_repositories_test.go
+++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go
@@ -6,13 +6,13 @@ import (
"flag"
"fmt"
"strings"
- "syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
@@ -85,10 +85,10 @@ func TestListUntrackedRepositories_Exec(t *testing.T) {
starterConfigs, err := getStarterConfigs(conf)
require.NoError(t, err)
stopped := make(chan struct{})
+ bootstrapper := bootstrap.NewNoop()
go func() {
defer close(stopped)
- err := run(starterConfigs, conf)
- assert.EqualError(t, err, `received signal "terminated"`)
+ assert.NoError(t, run(starterConfigs, conf, bootstrapper))
}()
cc, err := client.Dial("unix://"+conf.SocketPath, nil)
@@ -115,7 +115,7 @@ func TestListUntrackedRepositories_Exec(t *testing.T) {
}
require.ElementsMatch(t, exp, strings.Split(out.String(), "\n"))
- require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM))
+ bootstrapper.Terminate()
<-stopped
}
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index aa18bbdce..2305b947c 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -4,7 +4,6 @@ import (
"flag"
"path/filepath"
"strings"
- "syscall"
"testing"
"time"
@@ -13,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
@@ -108,10 +108,10 @@ func TestRemoveRepository_Exec(t *testing.T) {
starterConfigs, err := getStarterConfigs(conf)
require.NoError(t, err)
stopped := make(chan struct{})
+ bootstrapper := bootstrap.NewNoop()
go func() {
defer close(stopped)
- err := run(starterConfigs, conf)
- assert.EqualError(t, err, `received signal "terminated"`)
+ assert.NoError(t, run(starterConfigs, conf, bootstrapper))
}()
cc, err := client.Dial("unix://"+conf.SocketPath, nil)
@@ -210,7 +210,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
requireNoDatabaseInfo(t, db, cmd)
})
- require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM))
+ bootstrapper.Terminate()
<-stopped
}
diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go
index 95e43aa10..f1d355926 100644
--- a/internal/bootstrap/bootstrap.go
+++ b/internal/bootstrap/bootstrap.go
@@ -22,11 +22,18 @@ const (
socketReusePortWarning = "Unable to set SO_REUSEPORT: zero downtime upgrades will not work"
)
+// Listener is an interface of the bootstrap manager.
+type Listener interface {
+ // RegisterStarter adds starter to the pool.
+ RegisterStarter(starter Starter)
+ // Start starts all registered starters to accept connections.
+ Start() error
+ // Wait terminates all registered starters.
+ Wait(gracefulTimeout time.Duration, stopAction func()) error
+}
+
// Bootstrap handles graceful upgrades
type Bootstrap struct {
- // StopAction will be invoked during a graceful stop. It must wait until the shutdown is completed
- StopAction func()
-
upgrader upgrader
listenFunc ListenFunc
errChan chan error
@@ -152,7 +159,8 @@ func (b *Bootstrap) Start() error {
// Wait will signal process readiness to the parent and than wait for an exit condition
// SIGTERM, SIGINT and a runtime error will trigger an immediate shutdown
// in case of an upgrade there will be a grace period to complete the ongoing requests
-func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
+// stopAction will be invoked during a graceful stop. It must wait until the shutdown is completed.
+func (b *Bootstrap) Wait(gracefulTimeout time.Duration, stopAction func()) error {
signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT}
immediateShutdown := make(chan os.Signal, len(signals))
signal.Notify(immediateShutdown, signals...)
@@ -168,7 +176,7 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
// the new process signaled its readiness and we started a graceful stop
// however no further upgrades can be started until this process is running
// we set a grace period and then we force a termination.
- waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown)
+ waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown, stopAction)
err = fmt.Errorf("graceful upgrade: %v", waitError)
case s := <-immediateShutdown:
@@ -180,13 +188,13 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
return err
}
-func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal) error {
+func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal, stopAction func()) error {
log.WithField("graceful_timeout", gracefulTimeout).Warn("starting grace period")
allServersDone := make(chan struct{})
go func() {
- if b.StopAction != nil {
- b.StopAction()
+ if stopAction != nil {
+ stopAction()
}
close(allServersDone)
}()
@@ -210,3 +218,51 @@ func (b *Bootstrap) listen(network, path string) (net.Listener, error) {
return b.listenFunc(network, path)
}
+
+// Noop is a bootstrapper that does no additional configurations.
+type Noop struct {
+ starters []Starter
+ shutdown chan struct{}
+ errChan chan error
+}
+
+// NewNoop returns initialized instance of the *Noop.
+func NewNoop() *Noop {
+ return &Noop{shutdown: make(chan struct{})}
+}
+
+// RegisterStarter adds starter to the pool.
+func (n *Noop) RegisterStarter(starter Starter) {
+ n.starters = append(n.starters, starter)
+}
+
+// Start starts all registered starters to accept connections.
+func (n *Noop) Start() error {
+ n.errChan = make(chan error, len(n.starters))
+
+ for _, start := range n.starters {
+ if err := start(net.Listen, n.errChan); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Wait terminates all registered starters.
+func (n *Noop) Wait(_ time.Duration, stopAction func()) error {
+ select {
+ case <-n.shutdown:
+ if stopAction != nil {
+ stopAction()
+ }
+ case err := <-n.errChan:
+ return err
+ }
+
+ return nil
+}
+
+// Terminate unblocks Wait method and executes stopAction call-back passed into it.
+func (n *Noop) Terminate() {
+ close(n.shutdown)
+}
diff --git a/internal/bootstrap/bootstrap_test.go b/internal/bootstrap/bootstrap_test.go
index 1fb6c3485..83e0d2b5e 100644
--- a/internal/bootstrap/bootstrap_test.go
+++ b/internal/bootstrap/bootstrap_test.go
@@ -109,10 +109,10 @@ func TestImmediateTerminationOnSocketError(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- b, server := makeBootstrap(t, ctx)
+ b, server, stopAction := makeBootstrap(t, ctx)
waitCh := make(chan error)
- go func() { waitCh <- b.Wait(2 * time.Second) }()
+ go func() { waitCh <- b.Wait(2*time.Second, stopAction) }()
require.NoError(t, server.listeners["tcp"].Close(), "Closing first listener")
@@ -127,12 +127,12 @@ func TestImmediateTerminationOnSignal(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- b, server := makeBootstrap(t, ctx)
+ b, server, stopAction := makeBootstrap(t, ctx)
done := server.slowRequest(3 * time.Minute)
waitCh := make(chan error)
- go func() { waitCh <- b.Wait(2 * time.Second) }()
+ go func() { waitCh <- b.Wait(2*time.Second, stopAction) }()
// make sure we are inside b.Wait() or we'll kill the test suite
time.Sleep(100 * time.Millisecond)
@@ -157,9 +157,9 @@ func TestGracefulTerminationStuck(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- b, server := makeBootstrap(t, ctx)
+ b, server, stopAction := makeBootstrap(t, ctx)
- err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil)
+ err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction)
require.Contains(t, err.Error(), "grace period expired")
}
@@ -171,11 +171,11 @@ func TestGracefulTerminationWithSignals(t *testing.T) {
t.Run(sig.String(), func(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- b, server := makeBootstrap(t, ctx)
+ b, server, stopAction := makeBootstrap(t, ctx)
err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, func() {
require.NoError(t, self.Signal(sig))
- })
+ }, stopAction)
require.Contains(t, err.Error(), "force shutdown")
})
}
@@ -184,11 +184,11 @@ func TestGracefulTerminationWithSignals(t *testing.T) {
func TestGracefulTerminationServerErrors(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- b, server := makeBootstrap(t, ctx)
+ b, server, _ := makeBootstrap(t, ctx)
done := make(chan error, 1)
// This is a simulation of receiving a listener error during waitGracePeriod
- b.StopAction = func() {
+ stopAction := func() {
// we close the unix listener in order to test that the shutdown will not fail, but it keep waiting for the TCP request
require.NoError(t, server.listeners["unix"].Close())
@@ -200,7 +200,7 @@ func TestGracefulTerminationServerErrors(t *testing.T) {
require.NoError(t, server.server.Shutdown(context.Background()))
}
- err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil)
+ err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction)
require.Contains(t, err.Error(), "grace period expired")
require.NoError(t, <-done)
@@ -209,12 +209,12 @@ func TestGracefulTerminationServerErrors(t *testing.T) {
func TestGracefulTermination(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- b, server := makeBootstrap(t, ctx)
+ b, server, _ := makeBootstrap(t, ctx)
// Using server.Close we bypass the graceful shutdown faking a completed shutdown
- b.StopAction = func() { server.server.Close() }
+ stopAction := func() { server.server.Close() }
- err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil)
+ err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil, stopAction)
require.Contains(t, err.Error(), "completed")
}
@@ -236,9 +236,9 @@ func TestPortReuse(t *testing.T) {
b.upgrader.Stop()
}
-func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func()) error {
+func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func(), stopAction func()) error {
waitCh := make(chan error)
- go func() { waitCh <- b.Wait(gracefulWait) }()
+ go func() { waitCh <- b.Wait(gracefulWait, stopAction) }()
// Start a slow request to keep the old server from shutting down immediately.
req := server.slowRequest(2 * gracefulWait)
@@ -268,7 +268,7 @@ func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTime
return waitErr
}
-func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer) {
+func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer, func()) {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
@@ -292,8 +292,6 @@ func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer)
b, err := _new(u, net.Listen, false)
require.NoError(t, err)
- b.StopAction = func() { require.NoError(t, s.Shutdown(context.Background())) }
-
listeners := make(map[string]net.Listener)
start := func(network, address string) Starter {
return func(listen ListenFunc, errors chan<- error) error {
@@ -333,7 +331,7 @@ func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer)
server: &s,
listeners: listeners,
url: url,
- }
+ }, func() { require.NoError(t, s.Shutdown(context.Background())) }
}
func testAllListeners(t *testing.T, listeners map[string]net.Listener) {