Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer (GitLab) <jacob@gitlab.com>2017-08-22 14:32:11 +0300
committerAhmad Sherif <ahmad.m.sherif@gmail.com>2017-08-22 14:32:11 +0300
commit8b56a4a78952f6cbed1b13e1381fa7a93f7d6fea (patch)
tree669d48711aa82e6f798d4d676ec4b20e9de97673 /internal/supervisor
parent5d2f27592bf5c43b58559ccb57f05c0e03b792da (diff)
Respawn gitaly-ruby when it crashes
Diffstat (limited to 'internal/supervisor')
-rw-r--r--internal/supervisor/supervisor.go108
-rw-r--r--internal/supervisor/supervisor_test.go185
-rw-r--r--internal/supervisor/test-scripts/pid-server.go33
3 files changed, 316 insertions, 10 deletions
diff --git a/internal/supervisor/supervisor.go b/internal/supervisor/supervisor.go
index 4f9bba63e..5dea07e06 100644
--- a/internal/supervisor/supervisor.go
+++ b/internal/supervisor/supervisor.go
@@ -4,11 +4,39 @@ import (
"fmt"
"os"
"os/exec"
+ "sync"
+ "time"
+
+ log "github.com/Sirupsen/logrus"
+ "github.com/kelseyhightower/envconfig"
)
+// Config holds configuration for the circuit breaker of the respawn loop.
+type Config struct {
+ // GITALY_SUPERVISOR_CRASH_THRESHOLD
+ CrashThreshold int `split_words:"true" default:"5"`
+ // GITALY_SUPERVISOR_CRASH_WAIT_TIME
+ CrashWaitTime time.Duration `split_words:"true" default:"1m"`
+ // GITALY_SUPERVISOR_CRASH_RESET_TIME
+ CrashResetTime time.Duration `split_words:"true" default:"1m"`
+}
+
+var config Config
+
+func init() {
+ envconfig.MustProcess("gitaly_supervisor", &config)
+}
+
// Process represents a running process.
type Process struct {
- cmd *exec.Cmd
+ // Information to start the process
+ env []string
+ args []string
+ dir string
+
+ // Shutdown
+ done chan struct{}
+ stopOnce sync.Once
}
// New creates a new proces instance.
@@ -17,23 +45,83 @@ func New(env []string, args []string, dir string) (*Process, error) {
return nil, fmt.Errorf("need at least one argument")
}
- cmd := exec.Command(args[0], args[1:]...)
- cmd.Env = env
- cmd.Dir = dir
+ p := &Process{
+ env: env,
+ args: args,
+ dir: dir,
+ done: make(chan struct{}),
+ }
+
+ go watch(p)
+ return p, nil
+}
+
+func (p *Process) start() (*exec.Cmd, error) {
+ cmd := exec.Command(p.args[0], p.args[1:]...)
+ cmd.Env = p.env
+ cmd.Dir = p.dir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
- // TODO: spawn goroutine that watches (restarts) this process.
- return &Process{cmd: cmd}, cmd.Start()
+ return cmd, cmd.Start()
+}
+
+func watch(p *Process) {
+ // Count crashes to prevent a tight respawn loop. This is a 'circuit breaker'.
+ crashes := 0
+
+ logger := log.WithField("supervisor.args", p.args)
+
+ for {
+ if crashes >= config.CrashThreshold {
+ logger.Warn("opening circuit breaker")
+ select {
+ case <-p.done:
+ return
+ case <-time.After(config.CrashWaitTime):
+ logger.Warn("closing circuit breaker")
+ crashes = 0
+ }
+ }
+
+ cmd, err := p.start()
+ if err != nil {
+ crashes++
+ logger.WithError(err).Error("start failed")
+ continue
+ }
+
+ waitCh := make(chan struct{})
+ go func() {
+ logger.WithError(cmd.Wait()).Warn("exited")
+ close(waitCh)
+ }()
+
+ waitLoop:
+ for {
+ select {
+ case <-time.After(config.CrashResetTime):
+ crashes = 0
+ case <-waitCh:
+ crashes++
+ break waitLoop
+ case <-p.done:
+ if cmd.Process != nil {
+ cmd.Process.Kill()
+ }
+ return
+ }
+ }
+ }
}
// Stop terminates the process.
func (p *Process) Stop() {
- if p == nil || p.cmd == nil || p.cmd.Process == nil {
+ if p == nil {
return
}
- process := p.cmd.Process
- process.Kill()
- process.Wait()
+ p.stopOnce.Do(func() {
+ close(p.done)
+ })
}
diff --git a/internal/supervisor/supervisor_test.go b/internal/supervisor/supervisor_test.go
new file mode 100644
index 000000000..857c55ff7
--- /dev/null
+++ b/internal/supervisor/supervisor_test.go
@@ -0,0 +1,185 @@
+package supervisor
+
+import (
+ "context"
+ "io/ioutil"
+ "log"
+ "net"
+ "os"
+ "os/exec"
+ "path"
+ "path/filepath"
+ "strconv"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+var (
+ testDir string
+ testExe string
+ socketPath string
+)
+
+func TestMain(m *testing.M) {
+ os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) int {
+ var err error
+
+ testDir, err = ioutil.TempDir("", "gitaly-supervisor-test")
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer os.RemoveAll(testDir)
+
+ scriptPath, err := filepath.Abs("test-scripts/pid-server.go")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ testExe = path.Join(testDir, "pid-server")
+ buildCmd := exec.Command("go", "build", "-o", testExe, scriptPath)
+ buildCmd.Dir = testDir
+ buildCmd.Stderr = os.Stderr
+ buildCmd.Stdout = os.Stdout
+ if err := buildCmd.Run(); err != nil {
+ log.Fatal(err)
+ }
+
+ socketPath = path.Join(testDir, "socket")
+
+ return m.Run()
+}
+
+func TestRespawnAfterCrashWithoutCircuitBreaker(t *testing.T) {
+ process, err := New(nil, []string{testExe}, testDir)
+ require.NoError(t, err)
+ defer process.Stop()
+
+ attempts := config.CrashThreshold
+ require.True(t, attempts > 2, "config.CrashThreshold sanity check")
+
+ pids, err := tryConnect(socketPath, attempts, 1*time.Second)
+ require.NoError(t, err)
+
+ require.Equal(t, attempts, len(pids), "number of pids should equal number of attempts")
+
+ previous := 0
+ for _, pid := range pids {
+ require.True(t, pid > 0, "pid > 0")
+ require.NotEqual(t, previous, pid, "pid sanity check")
+ previous = pid
+ }
+}
+
+func TestTooManyCrashes(t *testing.T) {
+ process, err := New(nil, []string{testExe}, testDir)
+ require.NoError(t, err)
+ defer process.Stop()
+
+ attempts := config.CrashThreshold + 1
+ require.True(t, attempts > 2, "config.CrashThreshold sanity check")
+
+ pids, err := tryConnect(socketPath, attempts, 1*time.Second)
+ require.Error(t, err, "circuit breaker should cause a connection error / timeout")
+
+ require.Equal(t, config.CrashThreshold, len(pids), "number of pids should equal circuit breaker threshold")
+}
+
+func TestSpawnFailure(t *testing.T) {
+ defer func(waitTime time.Duration) {
+ config.CrashWaitTime = waitTime
+ }(config.CrashWaitTime)
+
+ config.CrashWaitTime = 2 * time.Second
+
+ notFoundExe := path.Join(testDir, "not-found")
+ require.NoError(t, os.RemoveAll(notFoundExe))
+ defer os.Remove(notFoundExe)
+
+ process, err := New(nil, []string{notFoundExe}, testDir)
+ require.NoError(t, err)
+ defer process.Stop()
+
+ time.Sleep(1 * time.Second)
+
+ pids, err := tryConnect(socketPath, 1, 1*time.Millisecond)
+ require.Error(t, err, "connection must fail because executable cannot be spawned")
+ require.Equal(t, 0, len(pids))
+
+ // 'Fix' the spawning problem of our process
+ require.NoError(t, os.Symlink(testExe, notFoundExe))
+
+ // After CrashWaitTime, the circuit breaker should have closed
+ pids, err = tryConnect(socketPath, 1, config.CrashWaitTime)
+
+ require.NoError(t, err, "process should be accepting connections now")
+ require.Equal(t, 1, len(pids), "we should have received the pid of the new process")
+ require.True(t, pids[0] > 0, "pid sanity check")
+}
+
+func tryConnect(socketPath string, attempts int, timeout time.Duration) (pids []int, err error) {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ for j := 0; j < attempts; j++ {
+ var curPid int
+ for {
+ curPid, err = getPid(ctx, socketPath)
+ if err == nil {
+ break
+ }
+
+ select {
+ case <-ctx.Done():
+ return pids, ctx.Err()
+ case <-time.After(5 * time.Millisecond):
+ // sleep
+ }
+ }
+ if err != nil {
+ return pids, err
+ }
+
+ pids = append(pids, curPid)
+ if curPid > 0 {
+ syscall.Kill(curPid, syscall.SIGKILL)
+ }
+ }
+
+ return pids, err
+}
+
+func getPid(ctx context.Context, socket string) (int, error) {
+ var err error
+ var conn net.Conn
+
+ for {
+ conn, err = net.DialTimeout("unix", socket, 1*time.Millisecond)
+ if err == nil {
+ break
+ }
+
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-time.After(5 * time.Millisecond):
+ // sleep
+ }
+ }
+ if err != nil {
+ return 0, err
+ }
+ defer conn.Close()
+
+ response, err := ioutil.ReadAll(conn)
+ if err != nil {
+ return 0, err
+ }
+
+ return strconv.Atoi(string(response))
+}
diff --git a/internal/supervisor/test-scripts/pid-server.go b/internal/supervisor/test-scripts/pid-server.go
new file mode 100644
index 000000000..f7c5e056d
--- /dev/null
+++ b/internal/supervisor/test-scripts/pid-server.go
@@ -0,0 +1,33 @@
+package main
+
+import (
+ "fmt"
+ "log"
+ "net"
+ "os"
+)
+
+func main() {
+ if err := os.RemoveAll("socket"); err != nil {
+ log.Fatal(err)
+ }
+
+ l, err := net.Listen("unix", "socket")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ log.Print(err)
+ continue
+ }
+
+ if _, err := fmt.Fprintf(conn, "%d", os.Getpid()); err != nil {
+ log.Print(err)
+ }
+
+ conn.Close()
+ }
+}