Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer (GitLab) <jacob@gitlab.com>2018-04-19 11:09:14 +0300
committerZeger-Jan van de Weg <zegerjan@gitlab.com>2018-04-19 11:09:14 +0300
commitc6eabf6ab221cc214ab5733d7862b5a589673a69 (patch)
treea3e68594eb1bb184c5a9ae587b3c9e81c99d3d89 /internal/rubyserver
parentcbd4a653dfe97ccfed6d5a0b703d75ead4ac4754 (diff)
Add health checks for gitaly-ruby
Diffstat (limited to 'internal/rubyserver')
-rw-r--r--internal/rubyserver/health.go31
-rw-r--r--internal/rubyserver/health_test.go33
-rw-r--r--internal/rubyserver/rubyserver.go7
-rw-r--r--internal/rubyserver/worker.go123
-rw-r--r--internal/rubyserver/worker_test.go142
5 files changed, 264 insertions, 72 deletions
diff --git a/internal/rubyserver/health.go b/internal/rubyserver/health.go
new file mode 100644
index 000000000..fd7b32747
--- /dev/null
+++ b/internal/rubyserver/health.go
@@ -0,0 +1,31 @@
+package rubyserver
+
+import (
+ "net"
+ "time"
+
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+func ping(address string) error {
+ conn, err := grpc.Dial(
+ address,
+ grpc.WithInsecure(),
+ grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
+ return net.DialTimeout("unix", addr, timeout)
+ }),
+ )
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+
+ client := healthpb.NewHealthClient(conn)
+ _, err = client.Check(ctx, &healthpb.HealthCheckRequest{})
+ return err
+}
diff --git a/internal/rubyserver/health_test.go b/internal/rubyserver/health_test.go
new file mode 100644
index 000000000..f5a870321
--- /dev/null
+++ b/internal/rubyserver/health_test.go
@@ -0,0 +1,33 @@
+package rubyserver
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestPingSuccess(t *testing.T) {
+ testhelper.ConfigureRuby()
+ s, err := Start()
+ require.NoError(t, err)
+ defer s.Stop()
+
+ require.True(t, len(s.workers) > 0, "expected at least one worker in server")
+ w := s.workers[0]
+
+ var pingErr error
+ for start := time.Now(); time.Since(start) < 5*time.Second; time.Sleep(100 * time.Millisecond) {
+ pingErr = ping(w.address)
+ if pingErr == nil {
+ break
+ }
+ }
+
+ require.NoError(t, pingErr, "health check should pass")
+}
+
+func TestPingFail(t *testing.T) {
+ require.Error(t, ping("fake address"), "health check should fail")
+}
diff --git a/internal/rubyserver/rubyserver.go b/internal/rubyserver/rubyserver.go
index 779bb00bb..add562df3 100644
--- a/internal/rubyserver/rubyserver.go
+++ b/internal/rubyserver/rubyserver.go
@@ -81,6 +81,7 @@ func (s *Server) Stop() {
}
for _, w := range s.workers {
+ w.stopMonitor()
w.Process.Stop()
}
}
@@ -126,13 +127,13 @@ func Start() (*Server, error) {
args := []string{"bundle", "exec", "bin/ruby-cd", wd, gitalyRuby, strconv.Itoa(os.Getpid()), socketPath}
events := make(chan supervisor.Event)
-
- p, err := supervisor.New(name, env, args, cfg.Ruby.Dir, cfg.Ruby.MaxRSS, events)
+ check := func() error { return ping(socketPath) }
+ p, err := supervisor.New(name, env, args, cfg.Ruby.Dir, cfg.Ruby.MaxRSS, events, check)
if err != nil {
return nil, err
}
- s.workers = append(s.workers, newWorker(p, socketPath, events))
+ s.workers = append(s.workers, newWorker(p, socketPath, events, false))
}
return s, nil
diff --git a/internal/rubyserver/worker.go b/internal/rubyserver/worker.go
index a06e09618..ea7c22afb 100644
--- a/internal/rubyserver/worker.go
+++ b/internal/rubyserver/worker.go
@@ -31,19 +31,24 @@ func init() {
// it if necessary, in cooperation with the balancer.
type worker struct {
*supervisor.Process
- address string
- events <-chan supervisor.Event
+ address string
+ events <-chan supervisor.Event
+ shutdown chan struct{}
// This is for testing only, so that we can inject a fake balancer
balancerUpdate chan balancerProxy
+
+ testing bool
}
-func newWorker(p *supervisor.Process, address string, events <-chan supervisor.Event) *worker {
+func newWorker(p *supervisor.Process, address string, events <-chan supervisor.Event, testing bool) *worker {
w := &worker{
Process: p,
address: address,
events: events,
+ shutdown: make(chan struct{}),
balancerUpdate: make(chan balancerProxy),
+ testing: testing,
}
go w.monitor()
@@ -69,8 +74,17 @@ type defaultBalancer struct{}
func (defaultBalancer) AddAddress(s string) { balancer.AddAddress(s) }
func (defaultBalancer) RemoveAddress(s string) bool { return balancer.RemoveAddress(s) }
+var (
+ // Ignore health checks for the current process after it just restarted
+ healthRestartCoolOff = 5 * time.Minute
+ // Health considered bad after sustained failed health checks
+ healthRestartDelay = 1 * time.Minute
+)
+
func (w *worker) monitor() {
- sw := &stopwatch{}
+ swMem := &stopwatch{}
+ swHealth := &stopwatch{}
+ lastRestart := time.Now()
currentPid := 0
bal := <-w.balancerUpdate
@@ -78,16 +92,13 @@ func (w *worker) monitor() {
nextEvent:
select {
case e := <-w.events:
- if e.Pid <= 0 {
- log.WithFields(log.Fields{
- "worker.name": w.Name,
- "worker.event_pid": e.Pid,
- }).Info("received invalid PID")
- break nextEvent
- }
-
switch e.Type {
case supervisor.Up:
+ if badPid(e.Pid) {
+ w.logBadEvent(e)
+ break nextEvent
+ }
+
if e.Pid == currentPid {
// Ignore repeated events to avoid constantly resetting our internal
// state.
@@ -96,14 +107,22 @@ func (w *worker) monitor() {
bal.AddAddress(w.address)
currentPid = e.Pid
- sw.reset()
+
+ swMem.reset()
+ swHealth.reset()
+ lastRestart = time.Now()
case supervisor.MemoryHigh:
+ if badPid(e.Pid) {
+ w.logBadEvent(e)
+ break nextEvent
+ }
+
if e.Pid != currentPid {
break nextEvent
}
- sw.mark()
- if sw.elapsed() <= config.Config.Ruby.RestartDelay {
+ swMem.mark()
+ if swMem.elapsed() <= config.Config.Ruby.RestartDelay {
break nextEvent
}
@@ -111,42 +130,94 @@ func (w *worker) monitor() {
// we may leave the system without the capacity to make gitaly-ruby
// requests.
if bal.RemoveAddress(w.address) {
- go w.waitTerminate(e.Pid)
- sw.reset()
+ w.logPid(currentPid).Info("removed from balancer due to high memory")
+ go w.waitTerminate(currentPid)
+ swMem.reset()
}
case supervisor.MemoryLow:
+ if badPid(e.Pid) {
+ w.logBadEvent(e)
+ break nextEvent
+ }
+
if e.Pid != currentPid {
break nextEvent
}
- sw.reset()
+ swMem.reset()
+ case supervisor.HealthOK:
+ swHealth.reset()
+ case supervisor.HealthBad:
+ if time.Since(lastRestart) <= healthRestartCoolOff {
+ // Ignore health checks for a while after the supervised process restarted
+ break nextEvent
+ }
+
+ w.log().WithError(e.Error).Warn("health check failed")
+
+ swHealth.mark()
+ if swHealth.elapsed() <= healthRestartDelay {
+ break nextEvent
+ }
+
+ if bal.RemoveAddress(w.address) {
+ w.logPid(currentPid).Info("removed from balancer due to sustained failing health checks")
+ go w.waitTerminate(currentPid)
+ swHealth.reset()
+ }
default:
panic(fmt.Sprintf("unknown state %v", e.Type))
}
case bal = <-w.balancerUpdate:
// For testing only.
+ case <-w.shutdown:
+ return
}
}
}
+func (w *worker) stopMonitor() {
+ close(w.shutdown)
+}
+
+func badPid(pid int) bool {
+ return pid <= 0
+}
+
+func (w *worker) log() *log.Entry {
+ return log.WithFields(log.Fields{
+ "worker.name": w.Name,
+ })
+}
+
+func (w *worker) logPid(pid int) *log.Entry {
+ return w.log().WithFields(log.Fields{
+ "worker.pid": pid,
+ })
+}
+
+func (w *worker) logBadEvent(e supervisor.Event) {
+ w.log().WithFields(log.Fields{
+ "worker.event": e,
+ }).Error("monitor state machine received bad event")
+}
+
func (w *worker) waitTerminate(pid int) {
+ if w.testing {
+ return
+ }
+
// Wait for in-flight requests to reach the worker before we slam the
// door in their face.
time.Sleep(1 * time.Minute)
terminationCounter.WithLabelValues(w.Name).Inc()
- log.WithFields(log.Fields{
- "worker.name": w.Name,
- "worker.pid": pid,
- }).Info("sending SIGTERM")
+ w.logPid(pid).Info("sending SIGTERM")
syscall.Kill(pid, syscall.SIGTERM)
time.Sleep(config.Config.Ruby.GracefulRestartTimeout)
- log.WithFields(log.Fields{
- "worker.name": w.Name,
- "worker.pid": pid,
- }).Info("sending SIGKILL")
+ w.logPid(pid).Info("sending SIGKILL")
syscall.Kill(pid, syscall.SIGKILL)
}
diff --git a/internal/rubyserver/worker_test.go b/internal/rubyserver/worker_test.go
index 70bd21f0e..145ce2b55 100644
--- a/internal/rubyserver/worker_test.go
+++ b/internal/rubyserver/worker_test.go
@@ -1,6 +1,7 @@
package rubyserver
import (
+ "errors"
"testing"
"time"
@@ -10,7 +11,8 @@ import (
)
func TestWorker(t *testing.T) {
- restartDelay := 100 * time.Millisecond
+ restartDelay := 10 * time.Millisecond
+
defer func(old time.Duration) {
config.Config.Ruby.RestartDelay = old
}(config.Config.Ruby.RestartDelay)
@@ -18,77 +20,127 @@ func TestWorker(t *testing.T) {
events := make(chan supervisor.Event)
addr := "the address"
- w := newWorker(&supervisor.Process{Name: "testing"}, addr, events)
-
- mustIgnore := func(e supervisor.Event) {
- nothing := &nothingBalancer{t}
- w.balancerUpdate <- nothing
- t.Logf("sending Event %+v, expect nothing to happen", e)
- events <- e
- // This second balancer update is used to synchronize with the monitor
- // goroutine. When the channel send finishes, we know the event we sent
- // before must have been processed.
- w.balancerUpdate <- nothing
- }
+ w := newWorker(&supervisor.Process{Name: "testing"}, addr, events, true)
- mustAdd := func(e supervisor.Event) {
- add := newAdd(t, addr)
- w.balancerUpdate <- add
- t.Logf("sending Event %+v, expect balancer add", e)
- events <- e
- add.wait()
- }
-
- mustRemove := func(e supervisor.Event) {
- remove := newRemove(t, addr)
- w.balancerUpdate <- remove
- t.Logf("sending Event %+v, expect balancer remove", e)
- events <- e
- remove.wait()
- }
+ t.Log("ignore health failures during startup")
+ mustIgnore(t, w, func() { events <- healthBadEvent() })
firstPid := 123
- mustAdd(upEvent(firstPid))
+ t.Log("register first PID as 'up'")
+ mustAdd(t, w, addr, func() { events <- upEvent(firstPid) })
t.Log("ignore repeated up event")
- mustIgnore(upEvent(firstPid))
+ mustIgnore(t, w, func() { events <- upEvent(firstPid) })
t.Log("send mem high events but too fast to trigger restart")
for i := 0; i < 5; i++ {
- mustIgnore(memHighEvent(firstPid))
+ mustIgnore(t, w, func() { events <- memHighEvent(firstPid) })
}
t.Log("mem low resets mem high counter")
- mustIgnore(memLowEvent(firstPid))
+ mustIgnore(t, w, func() { events <- memLowEvent(firstPid) })
t.Log("send mem high events but too fast to trigger restart")
for i := 0; i < 5; i++ {
- mustIgnore(memHighEvent(firstPid))
+ mustIgnore(t, w, func() { events <- memHighEvent(firstPid) })
}
time.Sleep(2 * restartDelay)
t.Log("this mem high should push us over the threshold")
- mustRemove(memHighEvent(firstPid))
+ mustRemove(t, w, addr, func() { events <- memHighEvent(firstPid) })
+
+ t.Log("ignore health failures during startup")
+ mustIgnore(t, w, func() { events <- healthBadEvent() })
secondPid := 456
- t.Log("time for a new PID")
- mustAdd(upEvent(secondPid))
+ t.Log("registering a new PID")
+ mustAdd(t, w, addr, func() { events <- upEvent(secondPid) })
t.Log("ignore mem high events for the previous pid")
- mustIgnore(memHighEvent(firstPid))
+ mustIgnore(t, w, func() { events <- memHighEvent(firstPid) })
time.Sleep(2 * restartDelay)
- mustIgnore(memHighEvent(firstPid))
+ t.Log("ignore mem high also after restart delay has expired")
+ mustIgnore(t, w, func() { events <- memHighEvent(firstPid) })
t.Log("start high memory timer")
- mustIgnore(memHighEvent(secondPid))
+ mustIgnore(t, w, func() { events <- memHighEvent(secondPid) })
t.Log("ignore mem low event for wrong pid")
- mustIgnore(memLowEvent(firstPid))
+ mustIgnore(t, w, func() { events <- memLowEvent(firstPid) })
t.Log("send mem high count over the threshold")
time.Sleep(2 * restartDelay)
- mustRemove(memHighEvent(secondPid))
+ mustRemove(t, w, addr, func() { events <- memHighEvent(secondPid) })
+}
+
+func TestWorkerHealthChecks(t *testing.T) {
+ restartDelay := 10 * time.Millisecond
+
+ defer func(old time.Duration) {
+ healthRestartDelay = old
+ }(healthRestartDelay)
+ healthRestartDelay = restartDelay
+
+ defer func(old time.Duration) {
+ healthRestartCoolOff = old
+ }(healthRestartCoolOff)
+ healthRestartCoolOff = restartDelay
+
+ events := make(chan supervisor.Event)
+ addr := "the address"
+ w := newWorker(&supervisor.Process{Name: "testing"}, addr, events, true)
+
+ t.Log("ignore health failures during startup")
+ mustIgnore(t, w, func() { events <- healthBadEvent() })
+
+ firstPid := 123
+
+ t.Log("register first PID as 'up'")
+ mustAdd(t, w, addr, func() { events <- upEvent(firstPid) })
+
+ t.Log("still ignore health failures during startup")
+ mustIgnore(t, w, func() { events <- healthBadEvent() })
+
+ time.Sleep(2 * restartDelay)
+
+ t.Log("waited long enough, this health check should start health timer")
+ mustIgnore(t, w, func() { events <- healthBadEvent() })
+
+ time.Sleep(2 * restartDelay)
+
+ t.Log("this second failed health check should trigger failover")
+ mustRemove(t, w, addr, func() { events <- healthBadEvent() })
+
+ t.Log("ignore extra health failures")
+ mustIgnore(t, w, func() { events <- healthBadEvent() })
+}
+
+func mustIgnore(t *testing.T, w *worker, f func()) {
+ nothing := &nothingBalancer{t}
+ w.balancerUpdate <- nothing
+ t.Log("executing function that should be ignored by balancer")
+ f()
+ // This second balancer update is used to synchronize with the monitor
+ // goroutine. When the channel send finishes, we know the event we sent
+ // before must have been processed.
+ w.balancerUpdate <- nothing
+}
+
+func mustAdd(t *testing.T, w *worker, addr string, f func()) {
+ add := newAdd(t, addr)
+ w.balancerUpdate <- add
+ t.Log("executing function that should lead to balancer.AddAddress")
+ f()
+ add.wait()
+}
+
+func mustRemove(t *testing.T, w *worker, addr string, f func()) {
+ remove := newRemove(t, addr)
+ w.balancerUpdate <- remove
+ t.Log("executing function that should lead to balancer.RemoveAddress")
+ f()
+ remove.wait()
}
func waitFail(t *testing.T, done chan struct{}) {
@@ -111,6 +163,10 @@ func memLowEvent(pid int) supervisor.Event {
return supervisor.Event{Type: supervisor.MemoryLow, Pid: pid}
}
+func healthBadEvent() supervisor.Event {
+ return supervisor.Event{Type: supervisor.HealthBad, Error: errors.New("test bad health")}
+}
+
func newAdd(t *testing.T, addr string) *addBalancer {
return &addBalancer{
t: t,
@@ -156,7 +212,7 @@ type removeBalancer struct {
func (rb *removeBalancer) RemoveAddress(s string) bool {
require.Equal(rb.t, rb.addr, s, "removeBalancer expected RemoveAddress argument")
close(rb.done)
- return false
+ return true
}
func (rb *removeBalancer) AddAddress(s string) {
@@ -173,7 +229,7 @@ type nothingBalancer struct {
func (nb *nothingBalancer) RemoveAddress(s string) bool {
nb.t.Fatal("unexpected RemoveAddress call")
- return false
+ return true
}
func (nb *nothingBalancer) AddAddress(s string) {