Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Newdigate <andrew@gitlab.com>2017-12-07 19:55:43 +0300
committerAndrew Newdigate <andrew@gitlab.com>2017-12-07 19:55:43 +0300
commit113346b9589393d94ec9ba97b4fe8f631640f375 (patch)
tree43e482e7377f40f08d8878f2c46f1c4cda28eb44
parent164adab67e9f28b7ece915996c3dc5ca130e646d (diff)
parent776c0e6505c8ba14c0a5fbeb21f4cb78af2dc401 (diff)
Merge branch 'balancer' into 'master'
Restart gitaly-ruby when it uses too much memory Closes #720 See merge request gitlab-org/gitaly!465
-rw-r--r--CHANGELOG.md5
-rw-r--r--config.toml.example9
-rw-r--r--doc/configuration/README.md17
-rw-r--r--internal/config/config.go8
-rw-r--r--internal/config/ruby.go45
-rw-r--r--internal/rubyserver/balancer/balancer.go184
-rw-r--r--internal/rubyserver/balancer/balancer_test.go67
-rw-r--r--internal/rubyserver/rubyserver.go50
-rw-r--r--internal/rubyserver/stopwatch.go35
-rw-r--r--internal/rubyserver/worker.go155
-rw-r--r--internal/rubyserver/worker_test.go181
-rw-r--r--internal/supervisor/events.go20
-rw-r--r--internal/supervisor/monitor.go18
-rw-r--r--internal/supervisor/supervisor.go58
-rw-r--r--internal/supervisor/supervisor_test.go6
-rwxr-xr-xruby/bin/gitaly-ruby15
16 files changed, 828 insertions, 45 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 11f546e63..fdfe85417 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,10 @@
# Gitaly changelog
+UNRELEASED
+
+- Restart gitaly-ruby when it uses too much memory
+ https://gitlab.com/gitlab-org/gitaly/merge_requests/465
+
v0.58.0
- Implement RepostoryService::Fsck
diff --git a/config.toml.example b/config.toml.example
index 2ad2fd4ef..10f8cce49 100644
--- a/config.toml.example
+++ b/config.toml.example
@@ -42,6 +42,15 @@ path = "/home/git/repositories"
# The directory where gitaly-ruby is installed
dir = "/home/git/gitaly/ruby"
+# # Gitaly-ruby resident set size (RSS) that triggers a memory restart (bytes)
+# max_rss = 300000000
+#
+# # Grace period before a gitaly-ruby process is forcibly terminated after exceeding max_rss (seconds)
+# graceful_restart_timeout = "10m"
+#
+# # Time that gitaly-ruby memory must remain high before a restart (seconds)
+# restart_delay = "5m"
+
[gitlab-shell]
# The directory where gitlab-shell is installed
dir = "/home/git/gitlab-shell"
diff --git a/doc/configuration/README.md b/doc/configuration/README.md
index 2aa1940f8..a5e9b5265 100644
--- a/doc/configuration/README.md
+++ b/doc/configuration/README.md
@@ -94,6 +94,23 @@ match those in gitlab.yml.
|path|string|yes|Path to storage shard|
|name|string|yes|Name of storage shard|
+### gitaly-ruby
+
+A Gitaly process uses one or more gitaly-ruby helper processes to
+execute RPC's implemented in Ruby instead of Go. The `[gitaly-ruby]`
+section of the config file contains settings for these helper processes.
+
+These processes are known to occasionally suffer from memory leaks.
+Gitaly restarts its gitaly-ruby helpers when there memory exceeds the
+max\_rss limit.
+
+|name|type|required|notes|
+|----|----|--------|-----|
+|dir|string|yes|Path to where gitaly-ruby is installed (needed to boot the process).|
+|max_rss|integer|no|Resident set size limit that triggers a gitaly-ruby restart, in bytes. Default 300MB.|
+|graceful_restart_timeout|string|no|Grace period to allow a gitaly-ruby process to finish ongoing requests. Default 10 minutes ("10m").|
+|restart_delay|string|no|Time memory must be high before a restart is triggered, in seconds. Default 5 minutes ("5m").|
+
## Environment variables
### GITALY_DEBUG
diff --git a/internal/config/config.go b/internal/config/config.go
index 681525230..db2205b91 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -116,14 +116,6 @@ func validateShell() error {
return validateIsDirectory(Config.GitlabShell.Dir, "gitlab-shell.dir")
}
-func validateRuby() error {
- if len(Config.Ruby.Dir) == 0 {
- return fmt.Errorf("gitaly-ruby.dir is not set")
- }
-
- return validateIsDirectory(Config.Ruby.Dir, "gitaly-ruby.dir")
-}
-
func validateIsDirectory(path, name string) error {
s, err := os.Stat(path)
if err != nil {
diff --git a/internal/config/ruby.go b/internal/config/ruby.go
index 71ad92d56..dbafae1f3 100644
--- a/internal/config/ruby.go
+++ b/internal/config/ruby.go
@@ -1,6 +1,49 @@
package config
+import (
+ "fmt"
+ "time"
+)
+
// Ruby contains setting for Ruby worker processes
type Ruby struct {
- Dir string `toml:"dir"`
+ Dir string `toml:"dir"`
+ MaxRSS int `toml:"max_rss"`
+ GracefulRestartTimeout time.Duration
+ GracefulRestartTimeoutToml duration `toml:"graceful_restart_timeout"`
+ RestartDelay time.Duration
+ RestartDelayToml duration `toml:"restart_delay"`
+}
+
+// This type is a trick to let our TOML library parse durations from strings.
+type duration struct {
+ time.Duration
+}
+
+func (d *duration) UnmarshalText(text []byte) error {
+ var err error
+ d.Duration, err = time.ParseDuration(string(text))
+ return err
+}
+
+func validateRuby() error {
+ Config.Ruby.GracefulRestartTimeout = Config.Ruby.GracefulRestartTimeoutToml.Duration
+ if Config.Ruby.GracefulRestartTimeout == 0 {
+ Config.Ruby.GracefulRestartTimeout = 10 * time.Minute
+ }
+
+ if Config.Ruby.MaxRSS == 0 {
+ Config.Ruby.MaxRSS = 300 * 1024 * 1024
+ }
+
+ Config.Ruby.RestartDelay = Config.Ruby.RestartDelayToml.Duration
+ if Config.Ruby.RestartDelay == 0 {
+ Config.Ruby.RestartDelay = 5 * time.Minute
+ }
+
+ if len(Config.Ruby.Dir) == 0 {
+ return fmt.Errorf("gitaly-ruby.dir is not set")
+ }
+
+ return validateIsDirectory(Config.Ruby.Dir, "gitaly-ruby.dir")
}
diff --git a/internal/rubyserver/balancer/balancer.go b/internal/rubyserver/balancer/balancer.go
new file mode 100644
index 000000000..93d51d39e
--- /dev/null
+++ b/internal/rubyserver/balancer/balancer.go
@@ -0,0 +1,184 @@
+package balancer
+
+// In this package we manage a global pool of addresses for gitaly-ruby,
+// accessed via the gitaly-ruby:// scheme. The interface consists of the
+// AddAddress and RemoveAddress methods. RemoveAddress returns a boolean
+// indicating whether the address was removed; this is intended to give
+// back-pressure against repeated process restarts.
+//
+// The gitaly-ruby:// scheme exists because that is the way we can
+// interact with the internal client-side loadbalancer of grpc-go. A URL
+// for this scheme would be gitaly-ruby://foobar. For gitaly-ruby://
+// URL's, the host and port are ignored. So gitaly-ruby://foobar is
+// actually a working, valid address.
+//
+// Strictly speaking this package implements a gRPC 'Resolver'. This
+// resolver feeds address list updates to a gRPC 'balancer' which
+// interacts with the gRPC client connection machinery. A resolver
+// consists of a Builder which returns Resolver instances. Our Builder
+// manages the address pool and notifies its Resolver instances of
+// changes, which they then propagate into the gRPC library.
+//
+
+import (
+ "google.golang.org/grpc/resolver"
+)
+
+var (
+ lbBuilder = newBuilder()
+)
+
+func init() {
+ resolver.Register(lbBuilder)
+}
+
+// AddAddress adds the address of a gitaly-ruby instance to the load
+// balancer.
+func AddAddress(a string) {
+ lbBuilder.addAddress <- a
+}
+
+// RemoveAddress removes the address of a gitaly-ruby instance from the
+// load balancer. Returns false if the pool is too small to remove the
+// address.
+func RemoveAddress(addr string) bool {
+ ok := make(chan bool)
+ lbBuilder.removeAddress <- addressRemoval{ok: ok, addr: addr}
+ return <-ok
+}
+
+type addressRemoval struct {
+ addr string
+ ok chan<- bool
+}
+
+type addressUpdate struct {
+ addrs []resolver.Address
+ next chan struct{}
+}
+
+type builder struct {
+ addAddress chan string
+ removeAddress chan addressRemoval
+ addressUpdates chan addressUpdate
+}
+
+func newBuilder() *builder {
+ b := &builder{
+ addAddress: make(chan string),
+ removeAddress: make(chan addressRemoval),
+ addressUpdates: make(chan addressUpdate),
+ }
+ go b.monitor()
+
+ return b
+}
+
+// Scheme is the name of the address scheme that makes gRPC select this resolver.
+const Scheme = "gitaly-ruby"
+
+func (*builder) Scheme() string { return Scheme }
+
+// Build ignores its resolver.Target argument. That means it does not
+// care what "address" the caller wants to resolve. We always resolve to
+// the current list of address for local gitaly-ruby processes.
+func (b *builder) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
+ return newGitalyResolver(cc, b.addressUpdates), nil
+}
+
+// monitor serves address list requests and handles address updates.
+func (b *builder) monitor() {
+ addresses := make(map[string]struct{})
+ notify := make(chan struct{})
+
+ for {
+ au := addressUpdate{next: notify}
+ for a := range addresses {
+ au.addrs = append(au.addrs, resolver.Address{Addr: a})
+ }
+
+ select {
+ case b.addressUpdates <- au:
+ if len(au.addrs) == 0 {
+ panic("builder monitor sent empty address update")
+ }
+ case addr := <-b.addAddress:
+ addresses[addr] = struct{}{}
+ notify = broadcast(notify)
+ case removal := <-b.removeAddress:
+ _, addressKnown := addresses[removal.addr]
+ if !addressKnown || len(addresses) <= 1 {
+ removal.ok <- false
+ break
+ }
+
+ delete(addresses, removal.addr)
+ removal.ok <- true
+ notify = broadcast(notify)
+ }
+ }
+}
+
+// broadcast returns a fresh channel because we can only close them once
+func broadcast(ch chan struct{}) chan struct{} {
+ close(ch)
+ return make(chan struct{})
+}
+
+// gitalyResolver propagates address list updates to a
+// resolver.ClientConn instance
+type gitalyResolver struct {
+ clientConn resolver.ClientConn
+
+ started chan struct{}
+ done chan struct{}
+ resolveNow chan struct{}
+ addressUpdates chan addressUpdate
+}
+
+func newGitalyResolver(cc resolver.ClientConn, auCh chan addressUpdate) *gitalyResolver {
+ r := &gitalyResolver{
+ started: make(chan struct{}),
+ done: make(chan struct{}),
+ resolveNow: make(chan struct{}),
+ addressUpdates: auCh,
+ clientConn: cc,
+ }
+ go r.monitor()
+
+ // Don't return until we have sent at least one address update. This is
+ // meant to avoid panics inside the grpc-go library.
+ <-r.started
+
+ return r
+}
+
+func (r *gitalyResolver) ResolveNow(resolver.ResolveNowOption) {
+ r.resolveNow <- struct{}{}
+}
+
+func (r *gitalyResolver) Close() {
+ close(r.done)
+}
+
+func (r *gitalyResolver) monitor() {
+ notify := r.sendUpdate()
+ close(r.started)
+
+ for {
+ select {
+ case <-notify:
+ notify = r.sendUpdate()
+ case <-r.resolveNow:
+ notify = r.sendUpdate()
+ case <-r.done:
+ return
+ }
+ }
+}
+
+func (r *gitalyResolver) sendUpdate() chan struct{} {
+ au := <-r.addressUpdates
+ r.clientConn.NewAddress(au.addrs)
+ return au.next
+}
diff --git a/internal/rubyserver/balancer/balancer_test.go b/internal/rubyserver/balancer/balancer_test.go
new file mode 100644
index 000000000..a4d13493e
--- /dev/null
+++ b/internal/rubyserver/balancer/balancer_test.go
@@ -0,0 +1,67 @@
+package balancer
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestRemovals(t *testing.T) {
+ okActions := []action{
+ {add: "foo"},
+ {add: "bar"},
+ {add: "qux"},
+ {remove: "bar"},
+ {remove: "foo"},
+ }
+
+ testCases := []struct {
+ desc string
+ actions []action
+ lastFails bool
+ }{
+ {
+ desc: "add then remove",
+ actions: okActions,
+ },
+ {
+ desc: "remove last address",
+ actions: append(okActions, action{remove: "qux"}),
+ lastFails: true,
+ },
+ {
+ desc: "remove unknown address",
+ actions: []action{
+ {add: "foo"},
+ {remove: "bar"},
+ },
+ lastFails: true,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ // This breaks integration with gRPC and causes a monitor goroutine leak.
+ // Not a problem for this test.
+ lbBuilder = newBuilder()
+
+ for i, a := range tc.actions {
+ if a.add != "" {
+ AddAddress(a.add)
+ } else {
+ expected := true
+ if i+1 == len(tc.actions) && tc.lastFails {
+ expected = false
+ }
+
+ require.Equal(t, expected, RemoveAddress(a.remove), "expected result from removing %q", a.remove)
+ }
+ }
+ })
+ }
+}
+
+type action struct {
+ add string
+ remove string
+}
diff --git a/internal/rubyserver/rubyserver.go b/internal/rubyserver/rubyserver.go
index 8753e213d..821529efa 100644
--- a/internal/rubyserver/rubyserver.go
+++ b/internal/rubyserver/rubyserver.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/command"
"gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/rubyserver/balancer"
"gitlab.com/gitlab-org/gitaly/internal/supervisor"
"gitlab.com/gitlab-org/gitaly/streamio"
@@ -53,17 +54,17 @@ func prepareSocketPath() {
}
}
-func socketPath() string {
+func socketPath(id int) string {
if socketDir == "" {
panic("socketDir is not set")
}
- return path.Join(filepath.Clean(socketDir), "socket")
+ return path.Join(filepath.Clean(socketDir), fmt.Sprintf("socket.%d", id))
}
// Server represents a gitaly-ruby helper process.
type Server struct {
- *supervisor.Process
+ workers []*worker
clientConnMu sync.RWMutex
clientConn *grpc.ClientConn
}
@@ -77,8 +78,8 @@ func (s *Server) Stop() {
s.clientConn.Close()
}
- if s.Process != nil {
- s.Process.Stop()
+ for _, w := range s.workers {
+ w.Process.Stop()
}
}
@@ -97,21 +98,36 @@ func Start() (*Server, error) {
}
cfg := config.Config
- env := []string{
- "GITALY_RUBY_GIT_BIN_PATH=" + command.GitPath(),
+ env := append(
+ os.Environ(),
+ "GITALY_RUBY_GIT_BIN_PATH="+command.GitPath(),
fmt.Sprintf("GITALY_RUBY_WRITE_BUFFER_SIZE=%d", streamio.WriteBufferSize),
- "GITALY_RUBY_GITLAB_SHELL_PATH=" + cfg.GitlabShell.Dir,
- "GITALY_RUBY_GITALY_BIN_DIR=" + cfg.BinDir,
- }
+ "GITALY_RUBY_GITLAB_SHELL_PATH="+cfg.GitlabShell.Dir,
+ "GITALY_RUBY_GITALY_BIN_DIR="+cfg.BinDir,
+ )
gitalyRuby := path.Join(cfg.Ruby.Dir, "bin/gitaly-ruby")
- // Use 'ruby-cd' to make sure gitaly-ruby has the same working directory
- // as the current process. This is a hack to sort-of support relative
- // Unix socket paths.
- args := []string{"bundle", "exec", "bin/ruby-cd", wd, gitalyRuby, fmt.Sprintf("%d", os.Getpid()), socketPath()}
+ s := &Server{}
+ for i := 0; i < numWorkers; i++ {
+ name := fmt.Sprintf("gitaly-ruby.%d", i)
+ socketPath := socketPath(i)
+
+ // Use 'ruby-cd' to make sure gitaly-ruby has the same working directory
+ // as the current process. This is a hack to sort-of support relative
+ // Unix socket paths.
+ args := []string{"bundle", "exec", "bin/ruby-cd", wd, gitalyRuby, strconv.Itoa(os.Getpid()), socketPath}
+
+ events := make(chan supervisor.Event)
+
+ p, err := supervisor.New(name, env, args, cfg.Ruby.Dir, cfg.Ruby.MaxRSS, events)
+ if err != nil {
+ return nil, err
+ }
+
+ s.workers = append(s.workers, newWorker(p, socketPath, events))
+ }
- p, err := supervisor.New("gitaly-ruby", append(os.Environ(), env...), args, cfg.Ruby.Dir)
- return &Server{Process: p}, err
+ return s, nil
}
// CommitServiceClient returns a CommitServiceClient instance that is
@@ -185,7 +201,7 @@ func (s *Server) createConnection(ctx context.Context) (*grpc.ClientConn, error)
dialCtx, cancel := context.WithTimeout(ctx, ConnectTimeout)
defer cancel()
- conn, err := grpc.DialContext(dialCtx, socketPath(), dialOptions()...)
+ conn, err := grpc.DialContext(dialCtx, balancer.Scheme+"://gitaly-ruby", dialOptions()...)
if err != nil {
return nil, err
}
diff --git a/internal/rubyserver/stopwatch.go b/internal/rubyserver/stopwatch.go
new file mode 100644
index 000000000..b082cb155
--- /dev/null
+++ b/internal/rubyserver/stopwatch.go
@@ -0,0 +1,35 @@
+package rubyserver
+
+import (
+ "time"
+)
+
+type stopwatch struct {
+ t1 time.Time
+ t2 time.Time
+ running bool
+}
+
+// mark records the current time and starts the stopwatch if it is not already running
+func (st *stopwatch) mark() {
+ st.t2 = time.Now()
+
+ if !st.running {
+ st.t1 = st.t2
+ st.running = true
+ }
+}
+
+// reset stops the stopwatch and returns it to zero
+func (st *stopwatch) reset() {
+ st.running = false
+}
+
+// elapsed returns the time elapsed between the first and last 'mark'
+func (st *stopwatch) elapsed() time.Duration {
+ if !st.running {
+ return time.Duration(0)
+ }
+
+ return st.t2.Sub(st.t1)
+}
diff --git a/internal/rubyserver/worker.go b/internal/rubyserver/worker.go
new file mode 100644
index 000000000..79f80334b
--- /dev/null
+++ b/internal/rubyserver/worker.go
@@ -0,0 +1,155 @@
+package rubyserver
+
+import (
+ "fmt"
+ "syscall"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/rubyserver/balancer"
+ "gitlab.com/gitlab-org/gitaly/internal/supervisor"
+
+ "github.com/prometheus/client_golang/prometheus"
+ log "github.com/sirupsen/logrus"
+)
+
+const (
+ // We might want numWorkers to be configurable in the future. But at the
+ // moment we only support 'pick first' balancing so there is no point in
+ // having more than 2 workers yet.
+ numWorkers = 2
+)
+
+var (
+ terminationCounter = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_ruby_memory_terminations_total",
+ Help: "Number of times gitaly-ruby has been terminated because of excessive memory use.",
+ },
+ []string{"name"},
+ )
+)
+
+func init() {
+ prometheus.MustRegister(terminationCounter)
+}
+
+// worker observes the event stream of a supervised process and restarts
+// it if necessary, in cooperation with the balancer.
+type worker struct {
+ *supervisor.Process
+ address string
+ events <-chan supervisor.Event
+
+ // This is for testing only, so that we can inject a fake balancer
+ balancerUpdate chan balancerProxy
+}
+
+func newWorker(p *supervisor.Process, address string, events <-chan supervisor.Event) *worker {
+ w := &worker{
+ Process: p,
+ address: address,
+ events: events,
+ balancerUpdate: make(chan balancerProxy),
+ }
+ go w.monitor()
+
+ bal := defaultBalancer{}
+ w.balancerUpdate <- bal
+
+ // When we return from this function, requests may start coming in. If
+ // there are no addresses in the balancer when the first request comes in
+ // we can get a panic from grpc-go. So before returning, we ensure the
+ // current address has been added to the balancer.
+ bal.AddAddress(w.address)
+
+ return w
+}
+
+type balancerProxy interface {
+ AddAddress(string)
+ RemoveAddress(string) bool
+}
+
+type defaultBalancer struct{}
+
+func (defaultBalancer) AddAddress(s string) { balancer.AddAddress(s) }
+func (defaultBalancer) RemoveAddress(s string) bool { return balancer.RemoveAddress(s) }
+
+func (w *worker) monitor() {
+ sw := &stopwatch{}
+ currentPid := 0
+ bal := <-w.balancerUpdate
+
+ for {
+ nextEvent:
+ select {
+ case e := <-w.events:
+ if e.Pid <= 0 {
+ log.WithFields(log.Fields{
+ "worker.name": w.Name,
+ "worker.event_pid": e.Pid,
+ }).Info("received invalid PID")
+ break nextEvent
+ }
+
+ switch e.Type {
+ case supervisor.Up:
+ if e.Pid == currentPid {
+ // Ignore repeated events to avoid constantly resetting our internal
+ // state.
+ break nextEvent
+ }
+
+ bal.AddAddress(w.address)
+ currentPid = e.Pid
+ sw.reset()
+ case supervisor.MemoryHigh:
+ if e.Pid != currentPid {
+ break nextEvent
+ }
+
+ sw.mark()
+ if sw.elapsed() <= config.Config.Ruby.RestartDelay {
+ break nextEvent
+ }
+
+ // It is crucial to check the return value of RemoveAddress. If we don't
+ // we may leave the system without the capacity to make gitaly-ruby
+ // requests.
+ if bal.RemoveAddress(w.address) {
+ go w.waitTerminate(e.Pid)
+ sw.reset()
+ }
+ case supervisor.MemoryLow:
+ if e.Pid != currentPid {
+ break nextEvent
+ }
+
+ sw.reset()
+ default:
+ panic(fmt.Sprintf("unknown state %v", e.Type))
+ }
+ case bal = <-w.balancerUpdate:
+ // For testing only.
+ }
+ }
+}
+
+func (w *worker) waitTerminate(pid int) {
+ terminationCounter.WithLabelValues(w.Name).Inc()
+
+ log.WithFields(log.Fields{
+ "worker.name": w.Name,
+ "worker.pid": pid,
+ }).Info("sending SIGTERM")
+ syscall.Kill(pid, syscall.SIGTERM)
+
+ time.Sleep(config.Config.Ruby.GracefulRestartTimeout)
+
+ log.WithFields(log.Fields{
+ "worker.name": w.Name,
+ "worker.pid": pid,
+ }).Info("sending SIGKILL")
+ syscall.Kill(pid, syscall.SIGKILL)
+}
diff --git a/internal/rubyserver/worker_test.go b/internal/rubyserver/worker_test.go
new file mode 100644
index 000000000..70bd21f0e
--- /dev/null
+++ b/internal/rubyserver/worker_test.go
@@ -0,0 +1,181 @@
+package rubyserver
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/supervisor"
+)
+
+func TestWorker(t *testing.T) {
+ restartDelay := 100 * time.Millisecond
+ defer func(old time.Duration) {
+ config.Config.Ruby.RestartDelay = old
+ }(config.Config.Ruby.RestartDelay)
+ config.Config.Ruby.RestartDelay = restartDelay
+
+ events := make(chan supervisor.Event)
+ addr := "the address"
+ w := newWorker(&supervisor.Process{Name: "testing"}, addr, events)
+
+ mustIgnore := func(e supervisor.Event) {
+ nothing := &nothingBalancer{t}
+ w.balancerUpdate <- nothing
+ t.Logf("sending Event %+v, expect nothing to happen", e)
+ events <- e
+ // This second balancer update is used to synchronize with the monitor
+ // goroutine. When the channel send finishes, we know the event we sent
+ // before must have been processed.
+ w.balancerUpdate <- nothing
+ }
+
+ mustAdd := func(e supervisor.Event) {
+ add := newAdd(t, addr)
+ w.balancerUpdate <- add
+ t.Logf("sending Event %+v, expect balancer add", e)
+ events <- e
+ add.wait()
+ }
+
+ mustRemove := func(e supervisor.Event) {
+ remove := newRemove(t, addr)
+ w.balancerUpdate <- remove
+ t.Logf("sending Event %+v, expect balancer remove", e)
+ events <- e
+ remove.wait()
+ }
+
+ firstPid := 123
+
+ mustAdd(upEvent(firstPid))
+
+ t.Log("ignore repeated up event")
+ mustIgnore(upEvent(firstPid))
+
+ t.Log("send mem high events but too fast to trigger restart")
+ for i := 0; i < 5; i++ {
+ mustIgnore(memHighEvent(firstPid))
+ }
+
+ t.Log("mem low resets mem high counter")
+ mustIgnore(memLowEvent(firstPid))
+
+ t.Log("send mem high events but too fast to trigger restart")
+ for i := 0; i < 5; i++ {
+ mustIgnore(memHighEvent(firstPid))
+ }
+
+ time.Sleep(2 * restartDelay)
+ t.Log("this mem high should push us over the threshold")
+ mustRemove(memHighEvent(firstPid))
+
+ secondPid := 456
+ t.Log("time for a new PID")
+ mustAdd(upEvent(secondPid))
+
+ t.Log("ignore mem high events for the previous pid")
+ mustIgnore(memHighEvent(firstPid))
+ time.Sleep(2 * restartDelay)
+ mustIgnore(memHighEvent(firstPid))
+
+ t.Log("start high memory timer")
+ mustIgnore(memHighEvent(secondPid))
+
+ t.Log("ignore mem low event for wrong pid")
+ mustIgnore(memLowEvent(firstPid))
+
+ t.Log("send mem high count over the threshold")
+ time.Sleep(2 * restartDelay)
+ mustRemove(memHighEvent(secondPid))
+}
+
+func waitFail(t *testing.T, done chan struct{}) {
+ select {
+ case <-time.After(1 * time.Second):
+ t.Fatal("timeout waiting for balancer method call")
+ case <-done:
+ }
+}
+
+func upEvent(pid int) supervisor.Event {
+ return supervisor.Event{Type: supervisor.Up, Pid: pid}
+}
+
+func memHighEvent(pid int) supervisor.Event {
+ return supervisor.Event{Type: supervisor.MemoryHigh, Pid: pid}
+}
+
+func memLowEvent(pid int) supervisor.Event {
+ return supervisor.Event{Type: supervisor.MemoryLow, Pid: pid}
+}
+
+func newAdd(t *testing.T, addr string) *addBalancer {
+ return &addBalancer{
+ t: t,
+ addr: addr,
+ done: make(chan struct{}),
+ }
+}
+
+type addBalancer struct {
+ addr string
+ t *testing.T
+ done chan struct{}
+}
+
+func (ab *addBalancer) RemoveAddress(string) bool {
+ ab.t.Fatal("unexpected RemoveAddress call")
+ return false
+}
+
+func (ab *addBalancer) AddAddress(s string) {
+ require.Equal(ab.t, ab.addr, s, "addBalancer expected AddAddress argument")
+ close(ab.done)
+}
+
+func (ab *addBalancer) wait() {
+ waitFail(ab.t, ab.done)
+}
+
+func newRemove(t *testing.T, addr string) *removeBalancer {
+ return &removeBalancer{
+ t: t,
+ addr: addr,
+ done: make(chan struct{}),
+ }
+}
+
+type removeBalancer struct {
+ addr string
+ t *testing.T
+ done chan struct{}
+}
+
+func (rb *removeBalancer) RemoveAddress(s string) bool {
+ require.Equal(rb.t, rb.addr, s, "removeBalancer expected RemoveAddress argument")
+ close(rb.done)
+ return false
+}
+
+func (rb *removeBalancer) AddAddress(s string) {
+ rb.t.Fatal("unexpected AddAddress call")
+}
+
+func (rb *removeBalancer) wait() {
+ waitFail(rb.t, rb.done)
+}
+
+type nothingBalancer struct {
+ t *testing.T
+}
+
+func (nb *nothingBalancer) RemoveAddress(s string) bool {
+ nb.t.Fatal("unexpected RemoveAddress call")
+ return false
+}
+
+func (nb *nothingBalancer) AddAddress(s string) {
+ nb.t.Fatal("unexpected AddAddress call")
+}
diff --git a/internal/supervisor/events.go b/internal/supervisor/events.go
new file mode 100644
index 000000000..19822609e
--- /dev/null
+++ b/internal/supervisor/events.go
@@ -0,0 +1,20 @@
+package supervisor
+
+// EventType is used to label Event instances.
+type EventType int
+
+const (
+ // Up is a notification that the process with the accompanying PID is up.
+ Up EventType = iota
+ // MemoryHigh is a notification that process memory for the current PID
+ // exceeds the threshold.
+ MemoryHigh
+ // MemoryLow indicates the process memory is at or below the threshold.
+ MemoryLow
+)
+
+// Event is used to notify a listener of process state changes.
+type Event struct {
+ Type EventType
+ Pid int
+}
diff --git a/internal/supervisor/monitor.go b/internal/supervisor/monitor.go
index 443e212fa..447f906bd 100644
--- a/internal/supervisor/monitor.go
+++ b/internal/supervisor/monitor.go
@@ -29,7 +29,7 @@ type monitorProcess struct {
wait <-chan struct{}
}
-func monitorRss(procs <-chan monitorProcess, done chan<- struct{}) {
+func monitorRss(procs <-chan monitorProcess, done chan<- struct{}, events chan<- Event, threshold int) {
t := time.NewTicker(15 * time.Second)
defer t.Stop()
@@ -38,7 +38,21 @@ func monitorRss(procs <-chan monitorProcess, done chan<- struct{}) {
for mp := range procs {
monitorLoop:
for {
- rssGauge.WithLabelValues(mp.name).Set(float64(1024 * getRss(mp.pid)))
+ rss := 1024 * getRss(mp.pid)
+ rssGauge.WithLabelValues(mp.name).Set(float64(rss))
+
+ if rss > 0 {
+ event := Event{Type: MemoryLow, Pid: mp.pid}
+ if rss > threshold {
+ event.Type = MemoryHigh
+ }
+
+ select {
+ case events <- event:
+ default:
+ // The default case makes this non-blocking
+ }
+ }
select {
case <-mp.wait:
diff --git a/internal/supervisor/supervisor.go b/internal/supervisor/supervisor.go
index d13fed3dc..c862ec723 100644
--- a/internal/supervisor/supervisor.go
+++ b/internal/supervisor/supervisor.go
@@ -7,6 +7,7 @@ import (
"time"
"github.com/kelseyhightower/envconfig"
+ "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
@@ -21,17 +22,29 @@ type Config struct {
}
var (
+ startCounter = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_supervisor_starts_total",
+ Help: "Number of starts of supervised processes.",
+ },
+ []string{"name"},
+ )
+
config Config
)
func init() {
envconfig.MustProcess("gitaly_supervisor", &config)
+ prometheus.MustRegister(startCounter)
}
// Process represents a running process.
type Process struct {
Name string
+ memoryThreshold int
+ events chan<- Event
+
// Information to start the process
env []string
args []string
@@ -44,18 +57,20 @@ type Process struct {
}
// New creates a new proces instance.
-func New(name string, env []string, args []string, dir string) (*Process, error) {
+func New(name string, env []string, args []string, dir string, memoryThreshold int, events chan<- Event) (*Process, error) {
if len(args) < 1 {
return nil, fmt.Errorf("need at least one argument")
}
p := &Process{
- Name: name,
- env: env,
- args: args,
- dir: dir,
- shutdown: make(chan struct{}),
- done: make(chan struct{}),
+ Name: name,
+ memoryThreshold: memoryThreshold,
+ events: events,
+ env: env,
+ args: args,
+ dir: dir,
+ shutdown: make(chan struct{}),
+ done: make(chan struct{}),
}
go watch(p)
@@ -63,6 +78,8 @@ func New(name string, env []string, args []string, dir string) (*Process, error)
}
func (p *Process) start(logger *log.Entry) (*exec.Cmd, error) {
+ startCounter.WithLabelValues(p.Name).Inc()
+
cmd := exec.Command(p.args[0], p.args[1:]...)
cmd.Env = p.env
cmd.Dir = p.dir
@@ -71,6 +88,14 @@ func (p *Process) start(logger *log.Entry) (*exec.Cmd, error) {
return cmd, cmd.Start()
}
+// Non-blocking notification
+func (p *Process) notifyUp(pid int) {
+ select {
+ case p.events <- Event{Type: Up, Pid: pid}:
+ default:
+ }
+}
+
func watch(p *Process) {
// Count crashes to prevent a tight respawn loop. This is a 'circuit breaker'.
crashes := 0
@@ -81,7 +106,7 @@ func watch(p *Process) {
// on the monitor goroutine.
monitorChan := make(chan monitorProcess, config.CrashThreshold)
monitorDone := make(chan struct{})
- go monitorRss(monitorChan, monitorDone)
+ go monitorRss(monitorChan, monitorDone, p.events, p.memoryThreshold)
spawnLoop:
for {
@@ -108,19 +133,26 @@ spawnLoop:
logger.WithError(err).Error("start failed")
continue
}
- logger.WithField("supervisor.pid", cmd.Process.Pid).Warn("spawned")
+ pid := cmd.Process.Pid
+ p.notifyUp(pid)
+ logger.WithField("supervisor.pid", pid).Warn("spawned")
waitCh := make(chan struct{})
- go func() {
- logger.WithError(cmd.Wait()).Warn("exited")
+ go func(cmd *exec.Cmd, waitCh chan struct{}) {
+ err := cmd.Wait()
close(waitCh)
- }()
+ logger.WithError(err).Warn("exited")
+ }(cmd, waitCh)
- monitorChan <- monitorProcess{name: p.Name, pid: cmd.Process.Pid, wait: waitCh}
+ monitorChan <- monitorProcess{name: p.Name, pid: pid, wait: waitCh}
waitLoop:
for {
select {
+ case <-time.After(1 * time.Minute):
+ // We repeat this idempotent notification because its delivery is not
+ // guaranteed.
+ p.notifyUp(pid)
case <-time.After(config.CrashResetTime):
crashes = 0
case <-waitCh:
diff --git a/internal/supervisor/supervisor_test.go b/internal/supervisor/supervisor_test.go
index 8ad6d82db..dc374fcd4 100644
--- a/internal/supervisor/supervisor_test.go
+++ b/internal/supervisor/supervisor_test.go
@@ -59,7 +59,7 @@ func testMain(m *testing.M) int {
}
func TestRespawnAfterCrashWithoutCircuitBreaker(t *testing.T) {
- process, err := New(t.Name(), nil, []string{testExe}, testDir)
+ process, err := New(t.Name(), nil, []string{testExe}, testDir, 0, nil)
require.NoError(t, err)
defer process.Stop()
@@ -80,7 +80,7 @@ func TestRespawnAfterCrashWithoutCircuitBreaker(t *testing.T) {
}
func TestTooManyCrashes(t *testing.T) {
- process, err := New(t.Name(), nil, []string{testExe}, testDir)
+ process, err := New(t.Name(), nil, []string{testExe}, testDir, 0, nil)
require.NoError(t, err)
defer process.Stop()
@@ -104,7 +104,7 @@ func TestSpawnFailure(t *testing.T) {
require.NoError(t, os.RemoveAll(notFoundExe))
defer os.Remove(notFoundExe)
- process, err := New(t.Name(), nil, []string{notFoundExe}, testDir)
+ process, err := New(t.Name(), nil, []string{notFoundExe}, testDir, 0, nil)
require.NoError(t, err)
defer process.Stop()
diff --git a/ruby/bin/gitaly-ruby b/ruby/bin/gitaly-ruby
index 8b486601e..5bbec510e 100755
--- a/ruby/bin/gitaly-ruby
+++ b/ruby/bin/gitaly-ruby
@@ -28,7 +28,20 @@ def main
GitalyServer.register_handlers(s)
- s.run_till_terminated
+ wait_thread = Thread.new do
+ sleep
+ end
+
+ trap('TERM') { wait_thread.kill }
+
+ run_thread = Thread.new do
+ s.run
+ wait_thread.kill
+ end
+
+ wait_thread.join
+ s.stop
+ run_thread.join
end
def start_parent_watcher(original_ppid)