diff options
author | Andrew Newdigate <andrew@gitlab.com> | 2017-12-07 19:55:43 +0300 |
---|---|---|
committer | Andrew Newdigate <andrew@gitlab.com> | 2017-12-07 19:55:43 +0300 |
commit | 113346b9589393d94ec9ba97b4fe8f631640f375 (patch) | |
tree | 43e482e7377f40f08d8878f2c46f1c4cda28eb44 | |
parent | 164adab67e9f28b7ece915996c3dc5ca130e646d (diff) | |
parent | 776c0e6505c8ba14c0a5fbeb21f4cb78af2dc401 (diff) |
Merge branch 'balancer' into 'master'
Restart gitaly-ruby when it uses too much memory
Closes #720
See merge request gitlab-org/gitaly!465
-rw-r--r-- | CHANGELOG.md | 5 | ||||
-rw-r--r-- | config.toml.example | 9 | ||||
-rw-r--r-- | doc/configuration/README.md | 17 | ||||
-rw-r--r-- | internal/config/config.go | 8 | ||||
-rw-r--r-- | internal/config/ruby.go | 45 | ||||
-rw-r--r-- | internal/rubyserver/balancer/balancer.go | 184 | ||||
-rw-r--r-- | internal/rubyserver/balancer/balancer_test.go | 67 | ||||
-rw-r--r-- | internal/rubyserver/rubyserver.go | 50 | ||||
-rw-r--r-- | internal/rubyserver/stopwatch.go | 35 | ||||
-rw-r--r-- | internal/rubyserver/worker.go | 155 | ||||
-rw-r--r-- | internal/rubyserver/worker_test.go | 181 | ||||
-rw-r--r-- | internal/supervisor/events.go | 20 | ||||
-rw-r--r-- | internal/supervisor/monitor.go | 18 | ||||
-rw-r--r-- | internal/supervisor/supervisor.go | 58 | ||||
-rw-r--r-- | internal/supervisor/supervisor_test.go | 6 | ||||
-rwxr-xr-x | ruby/bin/gitaly-ruby | 15 |
16 files changed, 828 insertions, 45 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 11f546e63..fdfe85417 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Gitaly changelog +UNRELEASED + +- Restart gitaly-ruby when it uses too much memory + https://gitlab.com/gitlab-org/gitaly/merge_requests/465 + v0.58.0 - Implement RepostoryService::Fsck diff --git a/config.toml.example b/config.toml.example index 2ad2fd4ef..10f8cce49 100644 --- a/config.toml.example +++ b/config.toml.example @@ -42,6 +42,15 @@ path = "/home/git/repositories" # The directory where gitaly-ruby is installed dir = "/home/git/gitaly/ruby" +# # Gitaly-ruby resident set size (RSS) that triggers a memory restart (bytes) +# max_rss = 300000000 +# +# # Grace period before a gitaly-ruby process is forcibly terminated after exceeding max_rss (seconds) +# graceful_restart_timeout = "10m" +# +# # Time that gitaly-ruby memory must remain high before a restart (seconds) +# restart_delay = "5m" + [gitlab-shell] # The directory where gitlab-shell is installed dir = "/home/git/gitlab-shell" diff --git a/doc/configuration/README.md b/doc/configuration/README.md index 2aa1940f8..a5e9b5265 100644 --- a/doc/configuration/README.md +++ b/doc/configuration/README.md @@ -94,6 +94,23 @@ match those in gitlab.yml. |path|string|yes|Path to storage shard| |name|string|yes|Name of storage shard| +### gitaly-ruby + +A Gitaly process uses one or more gitaly-ruby helper processes to +execute RPC's implemented in Ruby instead of Go. The `[gitaly-ruby]` +section of the config file contains settings for these helper processes. + +These processes are known to occasionally suffer from memory leaks. +Gitaly restarts its gitaly-ruby helpers when there memory exceeds the +max\_rss limit. + +|name|type|required|notes| +|----|----|--------|-----| +|dir|string|yes|Path to where gitaly-ruby is installed (needed to boot the process).| +|max_rss|integer|no|Resident set size limit that triggers a gitaly-ruby restart, in bytes. Default 300MB.| +|graceful_restart_timeout|string|no|Grace period to allow a gitaly-ruby process to finish ongoing requests. Default 10 minutes ("10m").| +|restart_delay|string|no|Time memory must be high before a restart is triggered, in seconds. Default 5 minutes ("5m").| + ## Environment variables ### GITALY_DEBUG diff --git a/internal/config/config.go b/internal/config/config.go index 681525230..db2205b91 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -116,14 +116,6 @@ func validateShell() error { return validateIsDirectory(Config.GitlabShell.Dir, "gitlab-shell.dir") } -func validateRuby() error { - if len(Config.Ruby.Dir) == 0 { - return fmt.Errorf("gitaly-ruby.dir is not set") - } - - return validateIsDirectory(Config.Ruby.Dir, "gitaly-ruby.dir") -} - func validateIsDirectory(path, name string) error { s, err := os.Stat(path) if err != nil { diff --git a/internal/config/ruby.go b/internal/config/ruby.go index 71ad92d56..dbafae1f3 100644 --- a/internal/config/ruby.go +++ b/internal/config/ruby.go @@ -1,6 +1,49 @@ package config +import ( + "fmt" + "time" +) + // Ruby contains setting for Ruby worker processes type Ruby struct { - Dir string `toml:"dir"` + Dir string `toml:"dir"` + MaxRSS int `toml:"max_rss"` + GracefulRestartTimeout time.Duration + GracefulRestartTimeoutToml duration `toml:"graceful_restart_timeout"` + RestartDelay time.Duration + RestartDelayToml duration `toml:"restart_delay"` +} + +// This type is a trick to let our TOML library parse durations from strings. +type duration struct { + time.Duration +} + +func (d *duration) UnmarshalText(text []byte) error { + var err error + d.Duration, err = time.ParseDuration(string(text)) + return err +} + +func validateRuby() error { + Config.Ruby.GracefulRestartTimeout = Config.Ruby.GracefulRestartTimeoutToml.Duration + if Config.Ruby.GracefulRestartTimeout == 0 { + Config.Ruby.GracefulRestartTimeout = 10 * time.Minute + } + + if Config.Ruby.MaxRSS == 0 { + Config.Ruby.MaxRSS = 300 * 1024 * 1024 + } + + Config.Ruby.RestartDelay = Config.Ruby.RestartDelayToml.Duration + if Config.Ruby.RestartDelay == 0 { + Config.Ruby.RestartDelay = 5 * time.Minute + } + + if len(Config.Ruby.Dir) == 0 { + return fmt.Errorf("gitaly-ruby.dir is not set") + } + + return validateIsDirectory(Config.Ruby.Dir, "gitaly-ruby.dir") } diff --git a/internal/rubyserver/balancer/balancer.go b/internal/rubyserver/balancer/balancer.go new file mode 100644 index 000000000..93d51d39e --- /dev/null +++ b/internal/rubyserver/balancer/balancer.go @@ -0,0 +1,184 @@ +package balancer + +// In this package we manage a global pool of addresses for gitaly-ruby, +// accessed via the gitaly-ruby:// scheme. The interface consists of the +// AddAddress and RemoveAddress methods. RemoveAddress returns a boolean +// indicating whether the address was removed; this is intended to give +// back-pressure against repeated process restarts. +// +// The gitaly-ruby:// scheme exists because that is the way we can +// interact with the internal client-side loadbalancer of grpc-go. A URL +// for this scheme would be gitaly-ruby://foobar. For gitaly-ruby:// +// URL's, the host and port are ignored. So gitaly-ruby://foobar is +// actually a working, valid address. +// +// Strictly speaking this package implements a gRPC 'Resolver'. This +// resolver feeds address list updates to a gRPC 'balancer' which +// interacts with the gRPC client connection machinery. A resolver +// consists of a Builder which returns Resolver instances. Our Builder +// manages the address pool and notifies its Resolver instances of +// changes, which they then propagate into the gRPC library. +// + +import ( + "google.golang.org/grpc/resolver" +) + +var ( + lbBuilder = newBuilder() +) + +func init() { + resolver.Register(lbBuilder) +} + +// AddAddress adds the address of a gitaly-ruby instance to the load +// balancer. +func AddAddress(a string) { + lbBuilder.addAddress <- a +} + +// RemoveAddress removes the address of a gitaly-ruby instance from the +// load balancer. Returns false if the pool is too small to remove the +// address. +func RemoveAddress(addr string) bool { + ok := make(chan bool) + lbBuilder.removeAddress <- addressRemoval{ok: ok, addr: addr} + return <-ok +} + +type addressRemoval struct { + addr string + ok chan<- bool +} + +type addressUpdate struct { + addrs []resolver.Address + next chan struct{} +} + +type builder struct { + addAddress chan string + removeAddress chan addressRemoval + addressUpdates chan addressUpdate +} + +func newBuilder() *builder { + b := &builder{ + addAddress: make(chan string), + removeAddress: make(chan addressRemoval), + addressUpdates: make(chan addressUpdate), + } + go b.monitor() + + return b +} + +// Scheme is the name of the address scheme that makes gRPC select this resolver. +const Scheme = "gitaly-ruby" + +func (*builder) Scheme() string { return Scheme } + +// Build ignores its resolver.Target argument. That means it does not +// care what "address" the caller wants to resolve. We always resolve to +// the current list of address for local gitaly-ruby processes. +func (b *builder) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) { + return newGitalyResolver(cc, b.addressUpdates), nil +} + +// monitor serves address list requests and handles address updates. +func (b *builder) monitor() { + addresses := make(map[string]struct{}) + notify := make(chan struct{}) + + for { + au := addressUpdate{next: notify} + for a := range addresses { + au.addrs = append(au.addrs, resolver.Address{Addr: a}) + } + + select { + case b.addressUpdates <- au: + if len(au.addrs) == 0 { + panic("builder monitor sent empty address update") + } + case addr := <-b.addAddress: + addresses[addr] = struct{}{} + notify = broadcast(notify) + case removal := <-b.removeAddress: + _, addressKnown := addresses[removal.addr] + if !addressKnown || len(addresses) <= 1 { + removal.ok <- false + break + } + + delete(addresses, removal.addr) + removal.ok <- true + notify = broadcast(notify) + } + } +} + +// broadcast returns a fresh channel because we can only close them once +func broadcast(ch chan struct{}) chan struct{} { + close(ch) + return make(chan struct{}) +} + +// gitalyResolver propagates address list updates to a +// resolver.ClientConn instance +type gitalyResolver struct { + clientConn resolver.ClientConn + + started chan struct{} + done chan struct{} + resolveNow chan struct{} + addressUpdates chan addressUpdate +} + +func newGitalyResolver(cc resolver.ClientConn, auCh chan addressUpdate) *gitalyResolver { + r := &gitalyResolver{ + started: make(chan struct{}), + done: make(chan struct{}), + resolveNow: make(chan struct{}), + addressUpdates: auCh, + clientConn: cc, + } + go r.monitor() + + // Don't return until we have sent at least one address update. This is + // meant to avoid panics inside the grpc-go library. + <-r.started + + return r +} + +func (r *gitalyResolver) ResolveNow(resolver.ResolveNowOption) { + r.resolveNow <- struct{}{} +} + +func (r *gitalyResolver) Close() { + close(r.done) +} + +func (r *gitalyResolver) monitor() { + notify := r.sendUpdate() + close(r.started) + + for { + select { + case <-notify: + notify = r.sendUpdate() + case <-r.resolveNow: + notify = r.sendUpdate() + case <-r.done: + return + } + } +} + +func (r *gitalyResolver) sendUpdate() chan struct{} { + au := <-r.addressUpdates + r.clientConn.NewAddress(au.addrs) + return au.next +} diff --git a/internal/rubyserver/balancer/balancer_test.go b/internal/rubyserver/balancer/balancer_test.go new file mode 100644 index 000000000..a4d13493e --- /dev/null +++ b/internal/rubyserver/balancer/balancer_test.go @@ -0,0 +1,67 @@ +package balancer + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRemovals(t *testing.T) { + okActions := []action{ + {add: "foo"}, + {add: "bar"}, + {add: "qux"}, + {remove: "bar"}, + {remove: "foo"}, + } + + testCases := []struct { + desc string + actions []action + lastFails bool + }{ + { + desc: "add then remove", + actions: okActions, + }, + { + desc: "remove last address", + actions: append(okActions, action{remove: "qux"}), + lastFails: true, + }, + { + desc: "remove unknown address", + actions: []action{ + {add: "foo"}, + {remove: "bar"}, + }, + lastFails: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + // This breaks integration with gRPC and causes a monitor goroutine leak. + // Not a problem for this test. + lbBuilder = newBuilder() + + for i, a := range tc.actions { + if a.add != "" { + AddAddress(a.add) + } else { + expected := true + if i+1 == len(tc.actions) && tc.lastFails { + expected = false + } + + require.Equal(t, expected, RemoveAddress(a.remove), "expected result from removing %q", a.remove) + } + } + }) + } +} + +type action struct { + add string + remove string +} diff --git a/internal/rubyserver/rubyserver.go b/internal/rubyserver/rubyserver.go index 8753e213d..821529efa 100644 --- a/internal/rubyserver/rubyserver.go +++ b/internal/rubyserver/rubyserver.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/command" "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/rubyserver/balancer" "gitlab.com/gitlab-org/gitaly/internal/supervisor" "gitlab.com/gitlab-org/gitaly/streamio" @@ -53,17 +54,17 @@ func prepareSocketPath() { } } -func socketPath() string { +func socketPath(id int) string { if socketDir == "" { panic("socketDir is not set") } - return path.Join(filepath.Clean(socketDir), "socket") + return path.Join(filepath.Clean(socketDir), fmt.Sprintf("socket.%d", id)) } // Server represents a gitaly-ruby helper process. type Server struct { - *supervisor.Process + workers []*worker clientConnMu sync.RWMutex clientConn *grpc.ClientConn } @@ -77,8 +78,8 @@ func (s *Server) Stop() { s.clientConn.Close() } - if s.Process != nil { - s.Process.Stop() + for _, w := range s.workers { + w.Process.Stop() } } @@ -97,21 +98,36 @@ func Start() (*Server, error) { } cfg := config.Config - env := []string{ - "GITALY_RUBY_GIT_BIN_PATH=" + command.GitPath(), + env := append( + os.Environ(), + "GITALY_RUBY_GIT_BIN_PATH="+command.GitPath(), fmt.Sprintf("GITALY_RUBY_WRITE_BUFFER_SIZE=%d", streamio.WriteBufferSize), - "GITALY_RUBY_GITLAB_SHELL_PATH=" + cfg.GitlabShell.Dir, - "GITALY_RUBY_GITALY_BIN_DIR=" + cfg.BinDir, - } + "GITALY_RUBY_GITLAB_SHELL_PATH="+cfg.GitlabShell.Dir, + "GITALY_RUBY_GITALY_BIN_DIR="+cfg.BinDir, + ) gitalyRuby := path.Join(cfg.Ruby.Dir, "bin/gitaly-ruby") - // Use 'ruby-cd' to make sure gitaly-ruby has the same working directory - // as the current process. This is a hack to sort-of support relative - // Unix socket paths. - args := []string{"bundle", "exec", "bin/ruby-cd", wd, gitalyRuby, fmt.Sprintf("%d", os.Getpid()), socketPath()} + s := &Server{} + for i := 0; i < numWorkers; i++ { + name := fmt.Sprintf("gitaly-ruby.%d", i) + socketPath := socketPath(i) + + // Use 'ruby-cd' to make sure gitaly-ruby has the same working directory + // as the current process. This is a hack to sort-of support relative + // Unix socket paths. + args := []string{"bundle", "exec", "bin/ruby-cd", wd, gitalyRuby, strconv.Itoa(os.Getpid()), socketPath} + + events := make(chan supervisor.Event) + + p, err := supervisor.New(name, env, args, cfg.Ruby.Dir, cfg.Ruby.MaxRSS, events) + if err != nil { + return nil, err + } + + s.workers = append(s.workers, newWorker(p, socketPath, events)) + } - p, err := supervisor.New("gitaly-ruby", append(os.Environ(), env...), args, cfg.Ruby.Dir) - return &Server{Process: p}, err + return s, nil } // CommitServiceClient returns a CommitServiceClient instance that is @@ -185,7 +201,7 @@ func (s *Server) createConnection(ctx context.Context) (*grpc.ClientConn, error) dialCtx, cancel := context.WithTimeout(ctx, ConnectTimeout) defer cancel() - conn, err := grpc.DialContext(dialCtx, socketPath(), dialOptions()...) + conn, err := grpc.DialContext(dialCtx, balancer.Scheme+"://gitaly-ruby", dialOptions()...) if err != nil { return nil, err } diff --git a/internal/rubyserver/stopwatch.go b/internal/rubyserver/stopwatch.go new file mode 100644 index 000000000..b082cb155 --- /dev/null +++ b/internal/rubyserver/stopwatch.go @@ -0,0 +1,35 @@ +package rubyserver + +import ( + "time" +) + +type stopwatch struct { + t1 time.Time + t2 time.Time + running bool +} + +// mark records the current time and starts the stopwatch if it is not already running +func (st *stopwatch) mark() { + st.t2 = time.Now() + + if !st.running { + st.t1 = st.t2 + st.running = true + } +} + +// reset stops the stopwatch and returns it to zero +func (st *stopwatch) reset() { + st.running = false +} + +// elapsed returns the time elapsed between the first and last 'mark' +func (st *stopwatch) elapsed() time.Duration { + if !st.running { + return time.Duration(0) + } + + return st.t2.Sub(st.t1) +} diff --git a/internal/rubyserver/worker.go b/internal/rubyserver/worker.go new file mode 100644 index 000000000..79f80334b --- /dev/null +++ b/internal/rubyserver/worker.go @@ -0,0 +1,155 @@ +package rubyserver + +import ( + "fmt" + "syscall" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/rubyserver/balancer" + "gitlab.com/gitlab-org/gitaly/internal/supervisor" + + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +const ( + // We might want numWorkers to be configurable in the future. But at the + // moment we only support 'pick first' balancing so there is no point in + // having more than 2 workers yet. + numWorkers = 2 +) + +var ( + terminationCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_ruby_memory_terminations_total", + Help: "Number of times gitaly-ruby has been terminated because of excessive memory use.", + }, + []string{"name"}, + ) +) + +func init() { + prometheus.MustRegister(terminationCounter) +} + +// worker observes the event stream of a supervised process and restarts +// it if necessary, in cooperation with the balancer. +type worker struct { + *supervisor.Process + address string + events <-chan supervisor.Event + + // This is for testing only, so that we can inject a fake balancer + balancerUpdate chan balancerProxy +} + +func newWorker(p *supervisor.Process, address string, events <-chan supervisor.Event) *worker { + w := &worker{ + Process: p, + address: address, + events: events, + balancerUpdate: make(chan balancerProxy), + } + go w.monitor() + + bal := defaultBalancer{} + w.balancerUpdate <- bal + + // When we return from this function, requests may start coming in. If + // there are no addresses in the balancer when the first request comes in + // we can get a panic from grpc-go. So before returning, we ensure the + // current address has been added to the balancer. + bal.AddAddress(w.address) + + return w +} + +type balancerProxy interface { + AddAddress(string) + RemoveAddress(string) bool +} + +type defaultBalancer struct{} + +func (defaultBalancer) AddAddress(s string) { balancer.AddAddress(s) } +func (defaultBalancer) RemoveAddress(s string) bool { return balancer.RemoveAddress(s) } + +func (w *worker) monitor() { + sw := &stopwatch{} + currentPid := 0 + bal := <-w.balancerUpdate + + for { + nextEvent: + select { + case e := <-w.events: + if e.Pid <= 0 { + log.WithFields(log.Fields{ + "worker.name": w.Name, + "worker.event_pid": e.Pid, + }).Info("received invalid PID") + break nextEvent + } + + switch e.Type { + case supervisor.Up: + if e.Pid == currentPid { + // Ignore repeated events to avoid constantly resetting our internal + // state. + break nextEvent + } + + bal.AddAddress(w.address) + currentPid = e.Pid + sw.reset() + case supervisor.MemoryHigh: + if e.Pid != currentPid { + break nextEvent + } + + sw.mark() + if sw.elapsed() <= config.Config.Ruby.RestartDelay { + break nextEvent + } + + // It is crucial to check the return value of RemoveAddress. If we don't + // we may leave the system without the capacity to make gitaly-ruby + // requests. + if bal.RemoveAddress(w.address) { + go w.waitTerminate(e.Pid) + sw.reset() + } + case supervisor.MemoryLow: + if e.Pid != currentPid { + break nextEvent + } + + sw.reset() + default: + panic(fmt.Sprintf("unknown state %v", e.Type)) + } + case bal = <-w.balancerUpdate: + // For testing only. + } + } +} + +func (w *worker) waitTerminate(pid int) { + terminationCounter.WithLabelValues(w.Name).Inc() + + log.WithFields(log.Fields{ + "worker.name": w.Name, + "worker.pid": pid, + }).Info("sending SIGTERM") + syscall.Kill(pid, syscall.SIGTERM) + + time.Sleep(config.Config.Ruby.GracefulRestartTimeout) + + log.WithFields(log.Fields{ + "worker.name": w.Name, + "worker.pid": pid, + }).Info("sending SIGKILL") + syscall.Kill(pid, syscall.SIGKILL) +} diff --git a/internal/rubyserver/worker_test.go b/internal/rubyserver/worker_test.go new file mode 100644 index 000000000..70bd21f0e --- /dev/null +++ b/internal/rubyserver/worker_test.go @@ -0,0 +1,181 @@ +package rubyserver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/supervisor" +) + +func TestWorker(t *testing.T) { + restartDelay := 100 * time.Millisecond + defer func(old time.Duration) { + config.Config.Ruby.RestartDelay = old + }(config.Config.Ruby.RestartDelay) + config.Config.Ruby.RestartDelay = restartDelay + + events := make(chan supervisor.Event) + addr := "the address" + w := newWorker(&supervisor.Process{Name: "testing"}, addr, events) + + mustIgnore := func(e supervisor.Event) { + nothing := ¬hingBalancer{t} + w.balancerUpdate <- nothing + t.Logf("sending Event %+v, expect nothing to happen", e) + events <- e + // This second balancer update is used to synchronize with the monitor + // goroutine. When the channel send finishes, we know the event we sent + // before must have been processed. + w.balancerUpdate <- nothing + } + + mustAdd := func(e supervisor.Event) { + add := newAdd(t, addr) + w.balancerUpdate <- add + t.Logf("sending Event %+v, expect balancer add", e) + events <- e + add.wait() + } + + mustRemove := func(e supervisor.Event) { + remove := newRemove(t, addr) + w.balancerUpdate <- remove + t.Logf("sending Event %+v, expect balancer remove", e) + events <- e + remove.wait() + } + + firstPid := 123 + + mustAdd(upEvent(firstPid)) + + t.Log("ignore repeated up event") + mustIgnore(upEvent(firstPid)) + + t.Log("send mem high events but too fast to trigger restart") + for i := 0; i < 5; i++ { + mustIgnore(memHighEvent(firstPid)) + } + + t.Log("mem low resets mem high counter") + mustIgnore(memLowEvent(firstPid)) + + t.Log("send mem high events but too fast to trigger restart") + for i := 0; i < 5; i++ { + mustIgnore(memHighEvent(firstPid)) + } + + time.Sleep(2 * restartDelay) + t.Log("this mem high should push us over the threshold") + mustRemove(memHighEvent(firstPid)) + + secondPid := 456 + t.Log("time for a new PID") + mustAdd(upEvent(secondPid)) + + t.Log("ignore mem high events for the previous pid") + mustIgnore(memHighEvent(firstPid)) + time.Sleep(2 * restartDelay) + mustIgnore(memHighEvent(firstPid)) + + t.Log("start high memory timer") + mustIgnore(memHighEvent(secondPid)) + + t.Log("ignore mem low event for wrong pid") + mustIgnore(memLowEvent(firstPid)) + + t.Log("send mem high count over the threshold") + time.Sleep(2 * restartDelay) + mustRemove(memHighEvent(secondPid)) +} + +func waitFail(t *testing.T, done chan struct{}) { + select { + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for balancer method call") + case <-done: + } +} + +func upEvent(pid int) supervisor.Event { + return supervisor.Event{Type: supervisor.Up, Pid: pid} +} + +func memHighEvent(pid int) supervisor.Event { + return supervisor.Event{Type: supervisor.MemoryHigh, Pid: pid} +} + +func memLowEvent(pid int) supervisor.Event { + return supervisor.Event{Type: supervisor.MemoryLow, Pid: pid} +} + +func newAdd(t *testing.T, addr string) *addBalancer { + return &addBalancer{ + t: t, + addr: addr, + done: make(chan struct{}), + } +} + +type addBalancer struct { + addr string + t *testing.T + done chan struct{} +} + +func (ab *addBalancer) RemoveAddress(string) bool { + ab.t.Fatal("unexpected RemoveAddress call") + return false +} + +func (ab *addBalancer) AddAddress(s string) { + require.Equal(ab.t, ab.addr, s, "addBalancer expected AddAddress argument") + close(ab.done) +} + +func (ab *addBalancer) wait() { + waitFail(ab.t, ab.done) +} + +func newRemove(t *testing.T, addr string) *removeBalancer { + return &removeBalancer{ + t: t, + addr: addr, + done: make(chan struct{}), + } +} + +type removeBalancer struct { + addr string + t *testing.T + done chan struct{} +} + +func (rb *removeBalancer) RemoveAddress(s string) bool { + require.Equal(rb.t, rb.addr, s, "removeBalancer expected RemoveAddress argument") + close(rb.done) + return false +} + +func (rb *removeBalancer) AddAddress(s string) { + rb.t.Fatal("unexpected AddAddress call") +} + +func (rb *removeBalancer) wait() { + waitFail(rb.t, rb.done) +} + +type nothingBalancer struct { + t *testing.T +} + +func (nb *nothingBalancer) RemoveAddress(s string) bool { + nb.t.Fatal("unexpected RemoveAddress call") + return false +} + +func (nb *nothingBalancer) AddAddress(s string) { + nb.t.Fatal("unexpected AddAddress call") +} diff --git a/internal/supervisor/events.go b/internal/supervisor/events.go new file mode 100644 index 000000000..19822609e --- /dev/null +++ b/internal/supervisor/events.go @@ -0,0 +1,20 @@ +package supervisor + +// EventType is used to label Event instances. +type EventType int + +const ( + // Up is a notification that the process with the accompanying PID is up. + Up EventType = iota + // MemoryHigh is a notification that process memory for the current PID + // exceeds the threshold. + MemoryHigh + // MemoryLow indicates the process memory is at or below the threshold. + MemoryLow +) + +// Event is used to notify a listener of process state changes. +type Event struct { + Type EventType + Pid int +} diff --git a/internal/supervisor/monitor.go b/internal/supervisor/monitor.go index 443e212fa..447f906bd 100644 --- a/internal/supervisor/monitor.go +++ b/internal/supervisor/monitor.go @@ -29,7 +29,7 @@ type monitorProcess struct { wait <-chan struct{} } -func monitorRss(procs <-chan monitorProcess, done chan<- struct{}) { +func monitorRss(procs <-chan monitorProcess, done chan<- struct{}, events chan<- Event, threshold int) { t := time.NewTicker(15 * time.Second) defer t.Stop() @@ -38,7 +38,21 @@ func monitorRss(procs <-chan monitorProcess, done chan<- struct{}) { for mp := range procs { monitorLoop: for { - rssGauge.WithLabelValues(mp.name).Set(float64(1024 * getRss(mp.pid))) + rss := 1024 * getRss(mp.pid) + rssGauge.WithLabelValues(mp.name).Set(float64(rss)) + + if rss > 0 { + event := Event{Type: MemoryLow, Pid: mp.pid} + if rss > threshold { + event.Type = MemoryHigh + } + + select { + case events <- event: + default: + // The default case makes this non-blocking + } + } select { case <-mp.wait: diff --git a/internal/supervisor/supervisor.go b/internal/supervisor/supervisor.go index d13fed3dc..c862ec723 100644 --- a/internal/supervisor/supervisor.go +++ b/internal/supervisor/supervisor.go @@ -7,6 +7,7 @@ import ( "time" "github.com/kelseyhightower/envconfig" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) @@ -21,17 +22,29 @@ type Config struct { } var ( + startCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_supervisor_starts_total", + Help: "Number of starts of supervised processes.", + }, + []string{"name"}, + ) + config Config ) func init() { envconfig.MustProcess("gitaly_supervisor", &config) + prometheus.MustRegister(startCounter) } // Process represents a running process. type Process struct { Name string + memoryThreshold int + events chan<- Event + // Information to start the process env []string args []string @@ -44,18 +57,20 @@ type Process struct { } // New creates a new proces instance. -func New(name string, env []string, args []string, dir string) (*Process, error) { +func New(name string, env []string, args []string, dir string, memoryThreshold int, events chan<- Event) (*Process, error) { if len(args) < 1 { return nil, fmt.Errorf("need at least one argument") } p := &Process{ - Name: name, - env: env, - args: args, - dir: dir, - shutdown: make(chan struct{}), - done: make(chan struct{}), + Name: name, + memoryThreshold: memoryThreshold, + events: events, + env: env, + args: args, + dir: dir, + shutdown: make(chan struct{}), + done: make(chan struct{}), } go watch(p) @@ -63,6 +78,8 @@ func New(name string, env []string, args []string, dir string) (*Process, error) } func (p *Process) start(logger *log.Entry) (*exec.Cmd, error) { + startCounter.WithLabelValues(p.Name).Inc() + cmd := exec.Command(p.args[0], p.args[1:]...) cmd.Env = p.env cmd.Dir = p.dir @@ -71,6 +88,14 @@ func (p *Process) start(logger *log.Entry) (*exec.Cmd, error) { return cmd, cmd.Start() } +// Non-blocking notification +func (p *Process) notifyUp(pid int) { + select { + case p.events <- Event{Type: Up, Pid: pid}: + default: + } +} + func watch(p *Process) { // Count crashes to prevent a tight respawn loop. This is a 'circuit breaker'. crashes := 0 @@ -81,7 +106,7 @@ func watch(p *Process) { // on the monitor goroutine. monitorChan := make(chan monitorProcess, config.CrashThreshold) monitorDone := make(chan struct{}) - go monitorRss(monitorChan, monitorDone) + go monitorRss(monitorChan, monitorDone, p.events, p.memoryThreshold) spawnLoop: for { @@ -108,19 +133,26 @@ spawnLoop: logger.WithError(err).Error("start failed") continue } - logger.WithField("supervisor.pid", cmd.Process.Pid).Warn("spawned") + pid := cmd.Process.Pid + p.notifyUp(pid) + logger.WithField("supervisor.pid", pid).Warn("spawned") waitCh := make(chan struct{}) - go func() { - logger.WithError(cmd.Wait()).Warn("exited") + go func(cmd *exec.Cmd, waitCh chan struct{}) { + err := cmd.Wait() close(waitCh) - }() + logger.WithError(err).Warn("exited") + }(cmd, waitCh) - monitorChan <- monitorProcess{name: p.Name, pid: cmd.Process.Pid, wait: waitCh} + monitorChan <- monitorProcess{name: p.Name, pid: pid, wait: waitCh} waitLoop: for { select { + case <-time.After(1 * time.Minute): + // We repeat this idempotent notification because its delivery is not + // guaranteed. + p.notifyUp(pid) case <-time.After(config.CrashResetTime): crashes = 0 case <-waitCh: diff --git a/internal/supervisor/supervisor_test.go b/internal/supervisor/supervisor_test.go index 8ad6d82db..dc374fcd4 100644 --- a/internal/supervisor/supervisor_test.go +++ b/internal/supervisor/supervisor_test.go @@ -59,7 +59,7 @@ func testMain(m *testing.M) int { } func TestRespawnAfterCrashWithoutCircuitBreaker(t *testing.T) { - process, err := New(t.Name(), nil, []string{testExe}, testDir) + process, err := New(t.Name(), nil, []string{testExe}, testDir, 0, nil) require.NoError(t, err) defer process.Stop() @@ -80,7 +80,7 @@ func TestRespawnAfterCrashWithoutCircuitBreaker(t *testing.T) { } func TestTooManyCrashes(t *testing.T) { - process, err := New(t.Name(), nil, []string{testExe}, testDir) + process, err := New(t.Name(), nil, []string{testExe}, testDir, 0, nil) require.NoError(t, err) defer process.Stop() @@ -104,7 +104,7 @@ func TestSpawnFailure(t *testing.T) { require.NoError(t, os.RemoveAll(notFoundExe)) defer os.Remove(notFoundExe) - process, err := New(t.Name(), nil, []string{notFoundExe}, testDir) + process, err := New(t.Name(), nil, []string{notFoundExe}, testDir, 0, nil) require.NoError(t, err) defer process.Stop() diff --git a/ruby/bin/gitaly-ruby b/ruby/bin/gitaly-ruby index 8b486601e..5bbec510e 100755 --- a/ruby/bin/gitaly-ruby +++ b/ruby/bin/gitaly-ruby @@ -28,7 +28,20 @@ def main GitalyServer.register_handlers(s) - s.run_till_terminated + wait_thread = Thread.new do + sleep + end + + trap('TERM') { wait_thread.kill } + + run_thread = Thread.new do + s.run + wait_thread.kill + end + + wait_thread.join + s.stop + run_thread.join end def start_parent_watcher(original_ppid) |