diff options
author | Jacob Vosmaer (GitLab) <jacob@gitlab.com> | 2017-08-22 14:32:11 +0300 |
---|---|---|
committer | Ahmad Sherif <ahmad.m.sherif@gmail.com> | 2017-08-22 14:32:11 +0300 |
commit | 8b56a4a78952f6cbed1b13e1381fa7a93f7d6fea (patch) | |
tree | 669d48711aa82e6f798d4d676ec4b20e9de97673 /internal/supervisor | |
parent | 5d2f27592bf5c43b58559ccb57f05c0e03b792da (diff) |
Respawn gitaly-ruby when it crashes
Diffstat (limited to 'internal/supervisor')
-rw-r--r-- | internal/supervisor/supervisor.go | 108 | ||||
-rw-r--r-- | internal/supervisor/supervisor_test.go | 185 | ||||
-rw-r--r-- | internal/supervisor/test-scripts/pid-server.go | 33 |
3 files changed, 316 insertions, 10 deletions
diff --git a/internal/supervisor/supervisor.go b/internal/supervisor/supervisor.go index 4f9bba63e..5dea07e06 100644 --- a/internal/supervisor/supervisor.go +++ b/internal/supervisor/supervisor.go @@ -4,11 +4,39 @@ import ( "fmt" "os" "os/exec" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/kelseyhightower/envconfig" ) +// Config holds configuration for the circuit breaker of the respawn loop. +type Config struct { + // GITALY_SUPERVISOR_CRASH_THRESHOLD + CrashThreshold int `split_words:"true" default:"5"` + // GITALY_SUPERVISOR_CRASH_WAIT_TIME + CrashWaitTime time.Duration `split_words:"true" default:"1m"` + // GITALY_SUPERVISOR_CRASH_RESET_TIME + CrashResetTime time.Duration `split_words:"true" default:"1m"` +} + +var config Config + +func init() { + envconfig.MustProcess("gitaly_supervisor", &config) +} + // Process represents a running process. type Process struct { - cmd *exec.Cmd + // Information to start the process + env []string + args []string + dir string + + // Shutdown + done chan struct{} + stopOnce sync.Once } // New creates a new proces instance. @@ -17,23 +45,83 @@ func New(env []string, args []string, dir string) (*Process, error) { return nil, fmt.Errorf("need at least one argument") } - cmd := exec.Command(args[0], args[1:]...) - cmd.Env = env - cmd.Dir = dir + p := &Process{ + env: env, + args: args, + dir: dir, + done: make(chan struct{}), + } + + go watch(p) + return p, nil +} + +func (p *Process) start() (*exec.Cmd, error) { + cmd := exec.Command(p.args[0], p.args[1:]...) + cmd.Env = p.env + cmd.Dir = p.dir cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - // TODO: spawn goroutine that watches (restarts) this process. - return &Process{cmd: cmd}, cmd.Start() + return cmd, cmd.Start() +} + +func watch(p *Process) { + // Count crashes to prevent a tight respawn loop. This is a 'circuit breaker'. + crashes := 0 + + logger := log.WithField("supervisor.args", p.args) + + for { + if crashes >= config.CrashThreshold { + logger.Warn("opening circuit breaker") + select { + case <-p.done: + return + case <-time.After(config.CrashWaitTime): + logger.Warn("closing circuit breaker") + crashes = 0 + } + } + + cmd, err := p.start() + if err != nil { + crashes++ + logger.WithError(err).Error("start failed") + continue + } + + waitCh := make(chan struct{}) + go func() { + logger.WithError(cmd.Wait()).Warn("exited") + close(waitCh) + }() + + waitLoop: + for { + select { + case <-time.After(config.CrashResetTime): + crashes = 0 + case <-waitCh: + crashes++ + break waitLoop + case <-p.done: + if cmd.Process != nil { + cmd.Process.Kill() + } + return + } + } + } } // Stop terminates the process. func (p *Process) Stop() { - if p == nil || p.cmd == nil || p.cmd.Process == nil { + if p == nil { return } - process := p.cmd.Process - process.Kill() - process.Wait() + p.stopOnce.Do(func() { + close(p.done) + }) } diff --git a/internal/supervisor/supervisor_test.go b/internal/supervisor/supervisor_test.go new file mode 100644 index 000000000..857c55ff7 --- /dev/null +++ b/internal/supervisor/supervisor_test.go @@ -0,0 +1,185 @@ +package supervisor + +import ( + "context" + "io/ioutil" + "log" + "net" + "os" + "os/exec" + "path" + "path/filepath" + "strconv" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +var ( + testDir string + testExe string + socketPath string +) + +func TestMain(m *testing.M) { + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + var err error + + testDir, err = ioutil.TempDir("", "gitaly-supervisor-test") + if err != nil { + log.Fatal(err) + } + defer os.RemoveAll(testDir) + + scriptPath, err := filepath.Abs("test-scripts/pid-server.go") + if err != nil { + log.Fatal(err) + } + + testExe = path.Join(testDir, "pid-server") + buildCmd := exec.Command("go", "build", "-o", testExe, scriptPath) + buildCmd.Dir = testDir + buildCmd.Stderr = os.Stderr + buildCmd.Stdout = os.Stdout + if err := buildCmd.Run(); err != nil { + log.Fatal(err) + } + + socketPath = path.Join(testDir, "socket") + + return m.Run() +} + +func TestRespawnAfterCrashWithoutCircuitBreaker(t *testing.T) { + process, err := New(nil, []string{testExe}, testDir) + require.NoError(t, err) + defer process.Stop() + + attempts := config.CrashThreshold + require.True(t, attempts > 2, "config.CrashThreshold sanity check") + + pids, err := tryConnect(socketPath, attempts, 1*time.Second) + require.NoError(t, err) + + require.Equal(t, attempts, len(pids), "number of pids should equal number of attempts") + + previous := 0 + for _, pid := range pids { + require.True(t, pid > 0, "pid > 0") + require.NotEqual(t, previous, pid, "pid sanity check") + previous = pid + } +} + +func TestTooManyCrashes(t *testing.T) { + process, err := New(nil, []string{testExe}, testDir) + require.NoError(t, err) + defer process.Stop() + + attempts := config.CrashThreshold + 1 + require.True(t, attempts > 2, "config.CrashThreshold sanity check") + + pids, err := tryConnect(socketPath, attempts, 1*time.Second) + require.Error(t, err, "circuit breaker should cause a connection error / timeout") + + require.Equal(t, config.CrashThreshold, len(pids), "number of pids should equal circuit breaker threshold") +} + +func TestSpawnFailure(t *testing.T) { + defer func(waitTime time.Duration) { + config.CrashWaitTime = waitTime + }(config.CrashWaitTime) + + config.CrashWaitTime = 2 * time.Second + + notFoundExe := path.Join(testDir, "not-found") + require.NoError(t, os.RemoveAll(notFoundExe)) + defer os.Remove(notFoundExe) + + process, err := New(nil, []string{notFoundExe}, testDir) + require.NoError(t, err) + defer process.Stop() + + time.Sleep(1 * time.Second) + + pids, err := tryConnect(socketPath, 1, 1*time.Millisecond) + require.Error(t, err, "connection must fail because executable cannot be spawned") + require.Equal(t, 0, len(pids)) + + // 'Fix' the spawning problem of our process + require.NoError(t, os.Symlink(testExe, notFoundExe)) + + // After CrashWaitTime, the circuit breaker should have closed + pids, err = tryConnect(socketPath, 1, config.CrashWaitTime) + + require.NoError(t, err, "process should be accepting connections now") + require.Equal(t, 1, len(pids), "we should have received the pid of the new process") + require.True(t, pids[0] > 0, "pid sanity check") +} + +func tryConnect(socketPath string, attempts int, timeout time.Duration) (pids []int, err error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for j := 0; j < attempts; j++ { + var curPid int + for { + curPid, err = getPid(ctx, socketPath) + if err == nil { + break + } + + select { + case <-ctx.Done(): + return pids, ctx.Err() + case <-time.After(5 * time.Millisecond): + // sleep + } + } + if err != nil { + return pids, err + } + + pids = append(pids, curPid) + if curPid > 0 { + syscall.Kill(curPid, syscall.SIGKILL) + } + } + + return pids, err +} + +func getPid(ctx context.Context, socket string) (int, error) { + var err error + var conn net.Conn + + for { + conn, err = net.DialTimeout("unix", socket, 1*time.Millisecond) + if err == nil { + break + } + + select { + case <-ctx.Done(): + return 0, ctx.Err() + case <-time.After(5 * time.Millisecond): + // sleep + } + } + if err != nil { + return 0, err + } + defer conn.Close() + + response, err := ioutil.ReadAll(conn) + if err != nil { + return 0, err + } + + return strconv.Atoi(string(response)) +} diff --git a/internal/supervisor/test-scripts/pid-server.go b/internal/supervisor/test-scripts/pid-server.go new file mode 100644 index 000000000..f7c5e056d --- /dev/null +++ b/internal/supervisor/test-scripts/pid-server.go @@ -0,0 +1,33 @@ +package main + +import ( + "fmt" + "log" + "net" + "os" +) + +func main() { + if err := os.RemoveAll("socket"); err != nil { + log.Fatal(err) + } + + l, err := net.Listen("unix", "socket") + if err != nil { + log.Fatal(err) + } + + for { + conn, err := l.Accept() + if err != nil { + log.Print(err) + continue + } + + if _, err := fmt.Fprintf(conn, "%d", os.Getpid()); err != nil { + log.Print(err) + } + + conn.Close() + } +} |