From c6eabf6ab221cc214ab5733d7862b5a589673a69 Mon Sep 17 00:00:00 2001 From: "Jacob Vosmaer (GitLab)" Date: Thu, 19 Apr 2018 08:09:14 +0000 Subject: Add health checks for gitaly-ruby --- internal/rubyserver/health.go | 31 ++++++++ internal/rubyserver/health_test.go | 33 +++++++++ internal/rubyserver/rubyserver.go | 7 +- internal/rubyserver/worker.go | 123 +++++++++++++++++++++++++------- internal/rubyserver/worker_test.go | 142 ++++++++++++++++++++++++++----------- 5 files changed, 264 insertions(+), 72 deletions(-) create mode 100644 internal/rubyserver/health.go create mode 100644 internal/rubyserver/health_test.go (limited to 'internal/rubyserver') diff --git a/internal/rubyserver/health.go b/internal/rubyserver/health.go new file mode 100644 index 000000000..fd7b32747 --- /dev/null +++ b/internal/rubyserver/health.go @@ -0,0 +1,31 @@ +package rubyserver + +import ( + "net" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +func ping(address string) error { + conn, err := grpc.Dial( + address, + grpc.WithInsecure(), + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + }), + ) + if err != nil { + return err + } + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + client := healthpb.NewHealthClient(conn) + _, err = client.Check(ctx, &healthpb.HealthCheckRequest{}) + return err +} diff --git a/internal/rubyserver/health_test.go b/internal/rubyserver/health_test.go new file mode 100644 index 000000000..f5a870321 --- /dev/null +++ b/internal/rubyserver/health_test.go @@ -0,0 +1,33 @@ +package rubyserver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +func TestPingSuccess(t *testing.T) { + testhelper.ConfigureRuby() + s, err := Start() + require.NoError(t, err) + defer s.Stop() + + require.True(t, len(s.workers) > 0, "expected at least one worker in server") + w := s.workers[0] + + var pingErr error + for start := time.Now(); time.Since(start) < 5*time.Second; time.Sleep(100 * time.Millisecond) { + pingErr = ping(w.address) + if pingErr == nil { + break + } + } + + require.NoError(t, pingErr, "health check should pass") +} + +func TestPingFail(t *testing.T) { + require.Error(t, ping("fake address"), "health check should fail") +} diff --git a/internal/rubyserver/rubyserver.go b/internal/rubyserver/rubyserver.go index 779bb00bb..add562df3 100644 --- a/internal/rubyserver/rubyserver.go +++ b/internal/rubyserver/rubyserver.go @@ -81,6 +81,7 @@ func (s *Server) Stop() { } for _, w := range s.workers { + w.stopMonitor() w.Process.Stop() } } @@ -126,13 +127,13 @@ func Start() (*Server, error) { args := []string{"bundle", "exec", "bin/ruby-cd", wd, gitalyRuby, strconv.Itoa(os.Getpid()), socketPath} events := make(chan supervisor.Event) - - p, err := supervisor.New(name, env, args, cfg.Ruby.Dir, cfg.Ruby.MaxRSS, events) + check := func() error { return ping(socketPath) } + p, err := supervisor.New(name, env, args, cfg.Ruby.Dir, cfg.Ruby.MaxRSS, events, check) if err != nil { return nil, err } - s.workers = append(s.workers, newWorker(p, socketPath, events)) + s.workers = append(s.workers, newWorker(p, socketPath, events, false)) } return s, nil diff --git a/internal/rubyserver/worker.go b/internal/rubyserver/worker.go index a06e09618..ea7c22afb 100644 --- a/internal/rubyserver/worker.go +++ b/internal/rubyserver/worker.go @@ -31,19 +31,24 @@ func init() { // it if necessary, in cooperation with the balancer. type worker struct { *supervisor.Process - address string - events <-chan supervisor.Event + address string + events <-chan supervisor.Event + shutdown chan struct{} // This is for testing only, so that we can inject a fake balancer balancerUpdate chan balancerProxy + + testing bool } -func newWorker(p *supervisor.Process, address string, events <-chan supervisor.Event) *worker { +func newWorker(p *supervisor.Process, address string, events <-chan supervisor.Event, testing bool) *worker { w := &worker{ Process: p, address: address, events: events, + shutdown: make(chan struct{}), balancerUpdate: make(chan balancerProxy), + testing: testing, } go w.monitor() @@ -69,8 +74,17 @@ type defaultBalancer struct{} func (defaultBalancer) AddAddress(s string) { balancer.AddAddress(s) } func (defaultBalancer) RemoveAddress(s string) bool { return balancer.RemoveAddress(s) } +var ( + // Ignore health checks for the current process after it just restarted + healthRestartCoolOff = 5 * time.Minute + // Health considered bad after sustained failed health checks + healthRestartDelay = 1 * time.Minute +) + func (w *worker) monitor() { - sw := &stopwatch{} + swMem := &stopwatch{} + swHealth := &stopwatch{} + lastRestart := time.Now() currentPid := 0 bal := <-w.balancerUpdate @@ -78,16 +92,13 @@ func (w *worker) monitor() { nextEvent: select { case e := <-w.events: - if e.Pid <= 0 { - log.WithFields(log.Fields{ - "worker.name": w.Name, - "worker.event_pid": e.Pid, - }).Info("received invalid PID") - break nextEvent - } - switch e.Type { case supervisor.Up: + if badPid(e.Pid) { + w.logBadEvent(e) + break nextEvent + } + if e.Pid == currentPid { // Ignore repeated events to avoid constantly resetting our internal // state. @@ -96,14 +107,22 @@ func (w *worker) monitor() { bal.AddAddress(w.address) currentPid = e.Pid - sw.reset() + + swMem.reset() + swHealth.reset() + lastRestart = time.Now() case supervisor.MemoryHigh: + if badPid(e.Pid) { + w.logBadEvent(e) + break nextEvent + } + if e.Pid != currentPid { break nextEvent } - sw.mark() - if sw.elapsed() <= config.Config.Ruby.RestartDelay { + swMem.mark() + if swMem.elapsed() <= config.Config.Ruby.RestartDelay { break nextEvent } @@ -111,42 +130,94 @@ func (w *worker) monitor() { // we may leave the system without the capacity to make gitaly-ruby // requests. if bal.RemoveAddress(w.address) { - go w.waitTerminate(e.Pid) - sw.reset() + w.logPid(currentPid).Info("removed from balancer due to high memory") + go w.waitTerminate(currentPid) + swMem.reset() } case supervisor.MemoryLow: + if badPid(e.Pid) { + w.logBadEvent(e) + break nextEvent + } + if e.Pid != currentPid { break nextEvent } - sw.reset() + swMem.reset() + case supervisor.HealthOK: + swHealth.reset() + case supervisor.HealthBad: + if time.Since(lastRestart) <= healthRestartCoolOff { + // Ignore health checks for a while after the supervised process restarted + break nextEvent + } + + w.log().WithError(e.Error).Warn("health check failed") + + swHealth.mark() + if swHealth.elapsed() <= healthRestartDelay { + break nextEvent + } + + if bal.RemoveAddress(w.address) { + w.logPid(currentPid).Info("removed from balancer due to sustained failing health checks") + go w.waitTerminate(currentPid) + swHealth.reset() + } default: panic(fmt.Sprintf("unknown state %v", e.Type)) } case bal = <-w.balancerUpdate: // For testing only. + case <-w.shutdown: + return } } } +func (w *worker) stopMonitor() { + close(w.shutdown) +} + +func badPid(pid int) bool { + return pid <= 0 +} + +func (w *worker) log() *log.Entry { + return log.WithFields(log.Fields{ + "worker.name": w.Name, + }) +} + +func (w *worker) logPid(pid int) *log.Entry { + return w.log().WithFields(log.Fields{ + "worker.pid": pid, + }) +} + +func (w *worker) logBadEvent(e supervisor.Event) { + w.log().WithFields(log.Fields{ + "worker.event": e, + }).Error("monitor state machine received bad event") +} + func (w *worker) waitTerminate(pid int) { + if w.testing { + return + } + // Wait for in-flight requests to reach the worker before we slam the // door in their face. time.Sleep(1 * time.Minute) terminationCounter.WithLabelValues(w.Name).Inc() - log.WithFields(log.Fields{ - "worker.name": w.Name, - "worker.pid": pid, - }).Info("sending SIGTERM") + w.logPid(pid).Info("sending SIGTERM") syscall.Kill(pid, syscall.SIGTERM) time.Sleep(config.Config.Ruby.GracefulRestartTimeout) - log.WithFields(log.Fields{ - "worker.name": w.Name, - "worker.pid": pid, - }).Info("sending SIGKILL") + w.logPid(pid).Info("sending SIGKILL") syscall.Kill(pid, syscall.SIGKILL) } diff --git a/internal/rubyserver/worker_test.go b/internal/rubyserver/worker_test.go index 70bd21f0e..145ce2b55 100644 --- a/internal/rubyserver/worker_test.go +++ b/internal/rubyserver/worker_test.go @@ -1,6 +1,7 @@ package rubyserver import ( + "errors" "testing" "time" @@ -10,7 +11,8 @@ import ( ) func TestWorker(t *testing.T) { - restartDelay := 100 * time.Millisecond + restartDelay := 10 * time.Millisecond + defer func(old time.Duration) { config.Config.Ruby.RestartDelay = old }(config.Config.Ruby.RestartDelay) @@ -18,77 +20,127 @@ func TestWorker(t *testing.T) { events := make(chan supervisor.Event) addr := "the address" - w := newWorker(&supervisor.Process{Name: "testing"}, addr, events) - - mustIgnore := func(e supervisor.Event) { - nothing := ¬hingBalancer{t} - w.balancerUpdate <- nothing - t.Logf("sending Event %+v, expect nothing to happen", e) - events <- e - // This second balancer update is used to synchronize with the monitor - // goroutine. When the channel send finishes, we know the event we sent - // before must have been processed. - w.balancerUpdate <- nothing - } + w := newWorker(&supervisor.Process{Name: "testing"}, addr, events, true) - mustAdd := func(e supervisor.Event) { - add := newAdd(t, addr) - w.balancerUpdate <- add - t.Logf("sending Event %+v, expect balancer add", e) - events <- e - add.wait() - } - - mustRemove := func(e supervisor.Event) { - remove := newRemove(t, addr) - w.balancerUpdate <- remove - t.Logf("sending Event %+v, expect balancer remove", e) - events <- e - remove.wait() - } + t.Log("ignore health failures during startup") + mustIgnore(t, w, func() { events <- healthBadEvent() }) firstPid := 123 - mustAdd(upEvent(firstPid)) + t.Log("register first PID as 'up'") + mustAdd(t, w, addr, func() { events <- upEvent(firstPid) }) t.Log("ignore repeated up event") - mustIgnore(upEvent(firstPid)) + mustIgnore(t, w, func() { events <- upEvent(firstPid) }) t.Log("send mem high events but too fast to trigger restart") for i := 0; i < 5; i++ { - mustIgnore(memHighEvent(firstPid)) + mustIgnore(t, w, func() { events <- memHighEvent(firstPid) }) } t.Log("mem low resets mem high counter") - mustIgnore(memLowEvent(firstPid)) + mustIgnore(t, w, func() { events <- memLowEvent(firstPid) }) t.Log("send mem high events but too fast to trigger restart") for i := 0; i < 5; i++ { - mustIgnore(memHighEvent(firstPid)) + mustIgnore(t, w, func() { events <- memHighEvent(firstPid) }) } time.Sleep(2 * restartDelay) t.Log("this mem high should push us over the threshold") - mustRemove(memHighEvent(firstPid)) + mustRemove(t, w, addr, func() { events <- memHighEvent(firstPid) }) + + t.Log("ignore health failures during startup") + mustIgnore(t, w, func() { events <- healthBadEvent() }) secondPid := 456 - t.Log("time for a new PID") - mustAdd(upEvent(secondPid)) + t.Log("registering a new PID") + mustAdd(t, w, addr, func() { events <- upEvent(secondPid) }) t.Log("ignore mem high events for the previous pid") - mustIgnore(memHighEvent(firstPid)) + mustIgnore(t, w, func() { events <- memHighEvent(firstPid) }) time.Sleep(2 * restartDelay) - mustIgnore(memHighEvent(firstPid)) + t.Log("ignore mem high also after restart delay has expired") + mustIgnore(t, w, func() { events <- memHighEvent(firstPid) }) t.Log("start high memory timer") - mustIgnore(memHighEvent(secondPid)) + mustIgnore(t, w, func() { events <- memHighEvent(secondPid) }) t.Log("ignore mem low event for wrong pid") - mustIgnore(memLowEvent(firstPid)) + mustIgnore(t, w, func() { events <- memLowEvent(firstPid) }) t.Log("send mem high count over the threshold") time.Sleep(2 * restartDelay) - mustRemove(memHighEvent(secondPid)) + mustRemove(t, w, addr, func() { events <- memHighEvent(secondPid) }) +} + +func TestWorkerHealthChecks(t *testing.T) { + restartDelay := 10 * time.Millisecond + + defer func(old time.Duration) { + healthRestartDelay = old + }(healthRestartDelay) + healthRestartDelay = restartDelay + + defer func(old time.Duration) { + healthRestartCoolOff = old + }(healthRestartCoolOff) + healthRestartCoolOff = restartDelay + + events := make(chan supervisor.Event) + addr := "the address" + w := newWorker(&supervisor.Process{Name: "testing"}, addr, events, true) + + t.Log("ignore health failures during startup") + mustIgnore(t, w, func() { events <- healthBadEvent() }) + + firstPid := 123 + + t.Log("register first PID as 'up'") + mustAdd(t, w, addr, func() { events <- upEvent(firstPid) }) + + t.Log("still ignore health failures during startup") + mustIgnore(t, w, func() { events <- healthBadEvent() }) + + time.Sleep(2 * restartDelay) + + t.Log("waited long enough, this health check should start health timer") + mustIgnore(t, w, func() { events <- healthBadEvent() }) + + time.Sleep(2 * restartDelay) + + t.Log("this second failed health check should trigger failover") + mustRemove(t, w, addr, func() { events <- healthBadEvent() }) + + t.Log("ignore extra health failures") + mustIgnore(t, w, func() { events <- healthBadEvent() }) +} + +func mustIgnore(t *testing.T, w *worker, f func()) { + nothing := ¬hingBalancer{t} + w.balancerUpdate <- nothing + t.Log("executing function that should be ignored by balancer") + f() + // This second balancer update is used to synchronize with the monitor + // goroutine. When the channel send finishes, we know the event we sent + // before must have been processed. + w.balancerUpdate <- nothing +} + +func mustAdd(t *testing.T, w *worker, addr string, f func()) { + add := newAdd(t, addr) + w.balancerUpdate <- add + t.Log("executing function that should lead to balancer.AddAddress") + f() + add.wait() +} + +func mustRemove(t *testing.T, w *worker, addr string, f func()) { + remove := newRemove(t, addr) + w.balancerUpdate <- remove + t.Log("executing function that should lead to balancer.RemoveAddress") + f() + remove.wait() } func waitFail(t *testing.T, done chan struct{}) { @@ -111,6 +163,10 @@ func memLowEvent(pid int) supervisor.Event { return supervisor.Event{Type: supervisor.MemoryLow, Pid: pid} } +func healthBadEvent() supervisor.Event { + return supervisor.Event{Type: supervisor.HealthBad, Error: errors.New("test bad health")} +} + func newAdd(t *testing.T, addr string) *addBalancer { return &addBalancer{ t: t, @@ -156,7 +212,7 @@ type removeBalancer struct { func (rb *removeBalancer) RemoveAddress(s string) bool { require.Equal(rb.t, rb.addr, s, "removeBalancer expected RemoveAddress argument") close(rb.done) - return false + return true } func (rb *removeBalancer) AddAddress(s string) { @@ -173,7 +229,7 @@ type nothingBalancer struct { func (nb *nothingBalancer) RemoveAddress(s string) bool { nb.t.Fatal("unexpected RemoveAddress call") - return false + return true } func (nb *nothingBalancer) AddAddress(s string) { -- cgit v1.2.3