diff options
author | Zeger-Jan van de Weg <zegerjan@gitlab.com> | 2018-05-16 16:56:31 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <zegerjan@gitlab.com> | 2018-05-16 16:56:31 +0300 |
commit | 0e8cb5ad07960952a94c7eb0ec3167cccd7ae9ec (patch) | |
tree | d64175229e45583de9eafed630ba67147d8b63eb | |
parent | c5a643aea441f165b46b40f4eda283d9dec59184 (diff) | |
parent | aff01a0e09974d7019d49753077742b66a3cca9c (diff) |
Merge branch 'gitaly-ruby-round-robin' into 'master'
Use round robin load balancing instead of 'pick first' for gitaly-ruby
Closes #1167
See merge request gitlab-org/gitaly!700
-rw-r--r-- | CHANGELOG.md | 2 | ||||
-rw-r--r-- | config.toml.example | 3 | ||||
-rw-r--r-- | doc/configuration/README.md | 2 | ||||
-rw-r--r-- | internal/rubyserver/balancer/balancer.go | 84 | ||||
-rw-r--r-- | internal/rubyserver/balancer/balancer_test.go | 177 | ||||
-rw-r--r-- | internal/rubyserver/balancer/pool.go | 59 | ||||
-rw-r--r-- | internal/rubyserver/rubyserver.go | 5 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 6 |
8 files changed, 311 insertions, 27 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 9967b902a..adbb71c2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ UNRELEASED +- Use round robin load balancing instead of 'pick first' for gitaly-ruby + https://gitlab.com/gitlab-org/gitaly/merge_requests/700 - Unvendor Repository#create implementation https://gitlab.com/gitlab-org/gitaly/merge_requests/713 - Add gitaly-ruby installation debug log messages diff --git a/config.toml.example b/config.toml.example index b47ce94eb..76e0ea047 100644 --- a/config.toml.example +++ b/config.toml.example @@ -56,6 +56,9 @@ dir = "/home/git/gitaly/ruby" # # # Time that gitaly-ruby memory must remain high before a restart (seconds) # restart_delay = "5m" +# +# # Number of gitaly-ruby worker processes +# num_workers = 2 [gitlab-shell] # The directory where gitlab-shell is installed diff --git a/doc/configuration/README.md b/doc/configuration/README.md index 88ac51b1e..337e11f87 100644 --- a/doc/configuration/README.md +++ b/doc/configuration/README.md @@ -110,7 +110,7 @@ max\_rss limit. |max_rss|integer|no|Resident set size limit that triggers a gitaly-ruby restart, in bytes. Default 300MB.| |graceful_restart_timeout|string|no|Grace period to allow a gitaly-ruby process to finish ongoing requests. Default 10 minutes ("10m").| |restart_delay|string|no|Time memory must be high before a restart is triggered, in seconds. Default 5 minutes ("5m").| -|num_workers|integer|no|Number of gitaly-ruby worker processes. Default 2, minimum 2.| +|num_workers|integer|no|Number of gitaly-ruby worker processes. Try increasing this number in case of ResourceExhausted errors. Default 2, minimum 2.| |linguist_languages_path|string|no|Override for dynamic languages.json discovery. Default: "" (use dynamic discovery).| ### Logging diff --git a/internal/rubyserver/balancer/balancer.go b/internal/rubyserver/balancer/balancer.go index bceb53338..4ba136244 100644 --- a/internal/rubyserver/balancer/balancer.go +++ b/internal/rubyserver/balancer/balancer.go @@ -21,6 +21,8 @@ package balancer // import ( + "time" + "google.golang.org/grpc/resolver" ) @@ -32,6 +34,11 @@ func init() { resolver.Register(lbBuilder) } +const ( + // DefaultRemoveDelay is the minimum time between successive address removals. + DefaultRemoveDelay = 1 * time.Minute +) + // AddAddress adds the address of a gitaly-ruby instance to the load // balancer. func AddAddress(a string) { @@ -57,10 +64,38 @@ type addressUpdate struct { next chan struct{} } +type config struct { + numAddrs int + removeDelay time.Duration +} + type builder struct { addAddress chan string removeAddress chan addressRemoval addressUpdates chan addressUpdate + configUpdate chan config + + // for testing only + testingRestart chan struct{} +} + +// ConfigureBuilder changes the configuration of the global balancer +// instance. All calls that interact with the balancer will block until +// ConfigureBuilder has been called at least once. +func ConfigureBuilder(numAddrs int, removeDelay time.Duration) { + cfg := config{ + numAddrs: numAddrs, + removeDelay: removeDelay, + } + + if cfg.removeDelay <= 0 { + cfg.removeDelay = DefaultRemoveDelay + } + if numAddrs <= 0 { + panic("numAddrs must be at least 1") + } + + lbBuilder.configUpdate <- cfg } func newBuilder() *builder { @@ -68,6 +103,8 @@ func newBuilder() *builder { addAddress: make(chan string), removeAddress: make(chan addressRemoval), addressUpdates: make(chan addressUpdate), + configUpdate: make(chan config), + testingRestart: make(chan struct{}), } go b.monitor() @@ -83,43 +120,60 @@ func (*builder) Scheme() string { return Scheme } // care what "address" the caller wants to resolve. We always resolve to // the current list of address for local gitaly-ruby processes. func (b *builder) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) { - // JV: Normally I would delete this but this is very poorly documented, - // and I don't want to have to look up the magic words again. In case we - // ever want to do round-robin. - // cc.NewServiceConfig(`{"LoadBalancingPolicy":"round_robin"}`) - + cc.NewServiceConfig(`{"LoadBalancingPolicy":"round_robin"}`) return newGitalyResolver(cc, b.addressUpdates), nil } // monitor serves address list requests and handles address updates. func (b *builder) monitor() { - addresses := make(map[string]struct{}) + p := newPool() notify := make(chan struct{}) + cfg := <-b.configUpdate + lastRemoval := time.Now() + + // This channel is intentionally nil so that our 'select' below won't + // send messages to it. We do this to prevent sending out invalid (empty) + // messages during boot. + var addressUpdates chan addressUpdate for { au := addressUpdate{next: notify} - for a := range addresses { + for _, a := range p.activeAddrs() { au.addrs = append(au.addrs, resolver.Address{Addr: a}) } + if len(au.addrs) > 0 && addressUpdates == nil { + // Start listening for address update requests + addressUpdates = b.addressUpdates + } + select { - case b.addressUpdates <- au: - if len(au.addrs) == 0 { - panic("builder monitor sent empty address update") - } + case addressUpdates <- au: + // We have served an address update request case addr := <-b.addAddress: - addresses[addr] = struct{}{} + p.add(addr) + notify = broadcast(notify) case removal := <-b.removeAddress: - _, addressKnown := addresses[removal.addr] - if !addressKnown || len(addresses) <= 1 { + if time.Since(lastRemoval) < cfg.removeDelay || p.activeSize() < cfg.numAddrs-1 { + removal.ok <- false + break + } + + if !p.remove(removal.addr) { removal.ok <- false break } - delete(addresses, removal.addr) removal.ok <- true + lastRemoval = time.Now() notify = broadcast(notify) + case cfg = <-b.configUpdate: + // We have received a config update + case <-b.testingRestart: + go b.monitor() + b.configUpdate <- cfg + return } } } diff --git a/internal/rubyserver/balancer/balancer_test.go b/internal/rubyserver/balancer/balancer_test.go index a4d13493e..637b1d8f2 100644 --- a/internal/rubyserver/balancer/balancer_test.go +++ b/internal/rubyserver/balancer/balancer_test.go @@ -1,54 +1,171 @@ package balancer import ( + "encoding/json" + "fmt" + "strings" + "sync" "testing" + "time" "github.com/stretchr/testify/require" + "google.golang.org/grpc/resolver" ) +func TestServiceConfig(t *testing.T) { + configureBuilderTest(3) + + tcc := &testClientConn{} + lbBuilder.Build(resolver.Target{}, tcc, resolver.BuildOption{}) + + configUpdates := tcc.ConfigUpdates() + require.Len(t, configUpdates, 1, "expect exactly one config update") + + svcConfig := struct{ LoadBalancingPolicy string }{} + require.NoError(t, json.NewDecoder(strings.NewReader(configUpdates[0])).Decode(&svcConfig)) + require.Equal(t, "round_robin", svcConfig.LoadBalancingPolicy) +} + +func TestAddressUpdatesSmallestPool(t *testing.T) { + // The smallest number of addresses is 2: 1 standby, and 1 active. + addrs := configureBuilderTest(2) + + tcc := &testClientConn{} + lbBuilder.Build(resolver.Target{}, tcc, resolver.BuildOption{}) + + // Simulate some random updates + RemoveAddress(addrs[0]) + RemoveAddress(addrs[0]) + AddAddress(addrs[0]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[0]) + AddAddress(addrs[1]) + AddAddress(addrs[1]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[0]) + AddAddress(addrs[0]) + + addrUpdates := tcc.AddrUpdates() + require.True(t, len(addrUpdates) > 0, "expected at least one address update") + + expectedActive := len(addrs) - 1 // subtract 1 for the standby + for _, update := range addrUpdates { + require.Equal(t, expectedActive, len(update)) + } +} + +func TestAddressUpdatesRoundRobinPool(t *testing.T) { + // With 3 addresses in the pool, 2 will be active. + addrs := configureBuilderTest(3) + + tcc := &testClientConn{} + lbBuilder.Build(resolver.Target{}, tcc, resolver.BuildOption{}) + + // Simulate some random updates + RemoveAddress(addrs[0]) + RemoveAddress(addrs[0]) + RemoveAddress(addrs[2]) + AddAddress(addrs[0]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[0]) + AddAddress(addrs[2]) + AddAddress(addrs[1]) + AddAddress(addrs[1]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[2]) + RemoveAddress(addrs[1]) + AddAddress(addrs[1]) + RemoveAddress(addrs[2]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[0]) + AddAddress(addrs[0]) + + addrUpdates := tcc.AddrUpdates() + require.True(t, len(addrUpdates) > 0, "expected at least one address update") + + expectedActive := len(addrs) - 1 // subtract 1 for the standby + for _, update := range addrUpdates { + require.Equal(t, expectedActive, len(update)) + } +} + func TestRemovals(t *testing.T) { okActions := []action{ {add: "foo"}, {add: "bar"}, {add: "qux"}, {remove: "bar"}, + {add: "baz"}, {remove: "foo"}, } + numAddr := 3 + removeDelay := 1 * time.Millisecond + ConfigureBuilder(numAddr, removeDelay) testCases := []struct { desc string actions []action lastFails bool + delay time.Duration }{ { desc: "add then remove", actions: okActions, + delay: 2 * removeDelay, }, { - desc: "remove last address", + desc: "add then remove but too fast", + actions: okActions, + lastFails: true, + delay: 0, + }, + { + desc: "remove one address too many", actions: append(okActions, action{remove: "qux"}), lastFails: true, + delay: 2 * removeDelay, }, { desc: "remove unknown address", actions: []action{ {add: "foo"}, + {add: "qux"}, + {add: "baz"}, {remove: "bar"}, }, lastFails: true, + delay: 2 * removeDelay, + }, + { + // This relies on the implementation detail that the first address added + // to the balancer is the standby. The standby cannot be removed. + desc: "remove standby address", + actions: []action{ + {add: "foo"}, + {add: "qux"}, + {add: "baz"}, + {remove: "foo"}, + }, + lastFails: true, + delay: 2 * removeDelay, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - // This breaks integration with gRPC and causes a monitor goroutine leak. - // Not a problem for this test. - lbBuilder = newBuilder() + lbBuilder.testingRestart <- struct{}{} + time.Sleep(2 * removeDelay) // wait for lastRemoval in monitor goroutine to be long enough ago for i, a := range tc.actions { if a.add != "" { AddAddress(a.add) } else { + if tc.delay > 0 { + time.Sleep(tc.delay) + } + expected := true if i+1 == len(tc.actions) && tc.lastFails { expected = false @@ -65,3 +182,55 @@ type action struct { add string remove string } + +type testClientConn struct { + addrUpdates [][]resolver.Address + configUpdates []string + mu sync.Mutex +} + +func (tcc *testClientConn) NewAddress(addresses []resolver.Address) { + tcc.mu.Lock() + defer tcc.mu.Unlock() + + tcc.addrUpdates = append(tcc.addrUpdates, addresses) +} + +func (tcc *testClientConn) NewServiceConfig(serviceConfig string) { + tcc.mu.Lock() + defer tcc.mu.Unlock() + + tcc.configUpdates = append(tcc.configUpdates, serviceConfig) +} + +func (tcc *testClientConn) AddrUpdates() [][]resolver.Address { + tcc.mu.Lock() + defer tcc.mu.Unlock() + + return tcc.addrUpdates +} + +func (tcc *testClientConn) ConfigUpdates() []string { + tcc.mu.Lock() + defer tcc.mu.Unlock() + + return tcc.configUpdates +} + +// configureBuilderTest reconfigures the global builder and pre-populates +// it with addresses. It returns the list of addresses it added. +func configureBuilderTest(numAddrs int) []string { + delay := 1 * time.Millisecond + ConfigureBuilder(numAddrs, delay) + lbBuilder.testingRestart <- struct{}{} + time.Sleep(2 * delay) + + var addrs []string + for i := 0; i < numAddrs; i++ { + a := fmt.Sprintf("test.%d", i) + AddAddress(a) + addrs = append(addrs, a) + } + + return addrs +} diff --git a/internal/rubyserver/balancer/pool.go b/internal/rubyserver/balancer/pool.go new file mode 100644 index 000000000..f34097990 --- /dev/null +++ b/internal/rubyserver/balancer/pool.go @@ -0,0 +1,59 @@ +package balancer + +func newPool() *pool { + return &pool{active: make(map[string]struct{})} +} + +// pool is a set that keeps one address (element) set aside as a standby. +// This data structure is not thread safe. +type pool struct { + standby string + active map[string]struct{} +} + +// add is idempotent. If there is no standby address yet, addr becomes +// the standby. +func (p *pool) add(addr string) { + if _, ok := p.active[addr]; ok || p.standby == addr { + return + } + + if p.standby == "" { + p.standby = addr + return + } + + p.active[addr] = struct{}{} +} + +func (p *pool) activeSize() int { + return len(p.active) +} + +// remove tries to remove addr from the active addresses. If addr is not +// known or not active, remove returns false. +func (p *pool) remove(addr string) bool { + if _, ok := p.active[addr]; !ok || p.standby == "" { + return false + } + + delete(p.active, addr) + + // Promote the standby to an active address + p.active[p.standby] = struct{}{} + p.standby = "" + + return true +} + +// activeAddrs returns the currently active addresses as a list. The +// order is not deterministic. +func (p *pool) activeAddrs() []string { + var addrs []string + + for a := range p.active { + addrs = append(addrs, a) + } + + return addrs +} diff --git a/internal/rubyserver/rubyserver.go b/internal/rubyserver/rubyserver.go index 8b9f159dc..44cbbd8fc 100644 --- a/internal/rubyserver/rubyserver.go +++ b/internal/rubyserver/rubyserver.go @@ -116,8 +116,11 @@ func Start() (*Server, error) { gitalyRuby := path.Join(cfg.Ruby.Dir, "bin/gitaly-ruby") + numWorkers := cfg.Ruby.NumWorkers + balancer.ConfigureBuilder(numWorkers, 0) + s := &Server{} - for i := 0; i < cfg.Ruby.NumWorkers; i++ { + for i := 0; i < numWorkers; i++ { name := fmt.Sprintf("gitaly-ruby.%d", i) socketPath := socketPath(i) diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index 10dd4507e..892c11b78 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -259,12 +259,6 @@ func ConfigureRuby() { if err := config.ConfigureRuby(); err != nil { log.Fatal("validate ruby config: %v", err) } - - // This speeds up test boot-up. The reason we use more than 1 worker in - // production is to handle memory leaks, but the tests don't run for long - // enough to hit those memory leaks anyway. Most tests also run faster - // than the minimum delay before a ruby worker failover can even happen. - config.Config.Ruby.NumWorkers = 1 } // NewTestGrpcServer creates a GRPC Server for testing purposes |