From aff01a0e09974d7019d49753077742b66a3cca9c Mon Sep 17 00:00:00 2001 From: "Jacob Vosmaer (GitLab)" Date: Wed, 16 May 2018 13:56:31 +0000 Subject: Use round robin load balancing instead of 'pick first' for gitaly-ruby --- internal/rubyserver/balancer/balancer.go | 84 +++++++++--- internal/rubyserver/balancer/balancer_test.go | 177 +++++++++++++++++++++++++- internal/rubyserver/balancer/pool.go | 59 +++++++++ internal/rubyserver/rubyserver.go | 5 +- 4 files changed, 305 insertions(+), 20 deletions(-) create mode 100644 internal/rubyserver/balancer/pool.go (limited to 'internal/rubyserver') diff --git a/internal/rubyserver/balancer/balancer.go b/internal/rubyserver/balancer/balancer.go index bceb53338..4ba136244 100644 --- a/internal/rubyserver/balancer/balancer.go +++ b/internal/rubyserver/balancer/balancer.go @@ -21,6 +21,8 @@ package balancer // import ( + "time" + "google.golang.org/grpc/resolver" ) @@ -32,6 +34,11 @@ func init() { resolver.Register(lbBuilder) } +const ( + // DefaultRemoveDelay is the minimum time between successive address removals. + DefaultRemoveDelay = 1 * time.Minute +) + // AddAddress adds the address of a gitaly-ruby instance to the load // balancer. func AddAddress(a string) { @@ -57,10 +64,38 @@ type addressUpdate struct { next chan struct{} } +type config struct { + numAddrs int + removeDelay time.Duration +} + type builder struct { addAddress chan string removeAddress chan addressRemoval addressUpdates chan addressUpdate + configUpdate chan config + + // for testing only + testingRestart chan struct{} +} + +// ConfigureBuilder changes the configuration of the global balancer +// instance. All calls that interact with the balancer will block until +// ConfigureBuilder has been called at least once. +func ConfigureBuilder(numAddrs int, removeDelay time.Duration) { + cfg := config{ + numAddrs: numAddrs, + removeDelay: removeDelay, + } + + if cfg.removeDelay <= 0 { + cfg.removeDelay = DefaultRemoveDelay + } + if numAddrs <= 0 { + panic("numAddrs must be at least 1") + } + + lbBuilder.configUpdate <- cfg } func newBuilder() *builder { @@ -68,6 +103,8 @@ func newBuilder() *builder { addAddress: make(chan string), removeAddress: make(chan addressRemoval), addressUpdates: make(chan addressUpdate), + configUpdate: make(chan config), + testingRestart: make(chan struct{}), } go b.monitor() @@ -83,43 +120,60 @@ func (*builder) Scheme() string { return Scheme } // care what "address" the caller wants to resolve. We always resolve to // the current list of address for local gitaly-ruby processes. func (b *builder) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) { - // JV: Normally I would delete this but this is very poorly documented, - // and I don't want to have to look up the magic words again. In case we - // ever want to do round-robin. - // cc.NewServiceConfig(`{"LoadBalancingPolicy":"round_robin"}`) - + cc.NewServiceConfig(`{"LoadBalancingPolicy":"round_robin"}`) return newGitalyResolver(cc, b.addressUpdates), nil } // monitor serves address list requests and handles address updates. func (b *builder) monitor() { - addresses := make(map[string]struct{}) + p := newPool() notify := make(chan struct{}) + cfg := <-b.configUpdate + lastRemoval := time.Now() + + // This channel is intentionally nil so that our 'select' below won't + // send messages to it. We do this to prevent sending out invalid (empty) + // messages during boot. + var addressUpdates chan addressUpdate for { au := addressUpdate{next: notify} - for a := range addresses { + for _, a := range p.activeAddrs() { au.addrs = append(au.addrs, resolver.Address{Addr: a}) } + if len(au.addrs) > 0 && addressUpdates == nil { + // Start listening for address update requests + addressUpdates = b.addressUpdates + } + select { - case b.addressUpdates <- au: - if len(au.addrs) == 0 { - panic("builder monitor sent empty address update") - } + case addressUpdates <- au: + // We have served an address update request case addr := <-b.addAddress: - addresses[addr] = struct{}{} + p.add(addr) + notify = broadcast(notify) case removal := <-b.removeAddress: - _, addressKnown := addresses[removal.addr] - if !addressKnown || len(addresses) <= 1 { + if time.Since(lastRemoval) < cfg.removeDelay || p.activeSize() < cfg.numAddrs-1 { + removal.ok <- false + break + } + + if !p.remove(removal.addr) { removal.ok <- false break } - delete(addresses, removal.addr) removal.ok <- true + lastRemoval = time.Now() notify = broadcast(notify) + case cfg = <-b.configUpdate: + // We have received a config update + case <-b.testingRestart: + go b.monitor() + b.configUpdate <- cfg + return } } } diff --git a/internal/rubyserver/balancer/balancer_test.go b/internal/rubyserver/balancer/balancer_test.go index a4d13493e..637b1d8f2 100644 --- a/internal/rubyserver/balancer/balancer_test.go +++ b/internal/rubyserver/balancer/balancer_test.go @@ -1,54 +1,171 @@ package balancer import ( + "encoding/json" + "fmt" + "strings" + "sync" "testing" + "time" "github.com/stretchr/testify/require" + "google.golang.org/grpc/resolver" ) +func TestServiceConfig(t *testing.T) { + configureBuilderTest(3) + + tcc := &testClientConn{} + lbBuilder.Build(resolver.Target{}, tcc, resolver.BuildOption{}) + + configUpdates := tcc.ConfigUpdates() + require.Len(t, configUpdates, 1, "expect exactly one config update") + + svcConfig := struct{ LoadBalancingPolicy string }{} + require.NoError(t, json.NewDecoder(strings.NewReader(configUpdates[0])).Decode(&svcConfig)) + require.Equal(t, "round_robin", svcConfig.LoadBalancingPolicy) +} + +func TestAddressUpdatesSmallestPool(t *testing.T) { + // The smallest number of addresses is 2: 1 standby, and 1 active. + addrs := configureBuilderTest(2) + + tcc := &testClientConn{} + lbBuilder.Build(resolver.Target{}, tcc, resolver.BuildOption{}) + + // Simulate some random updates + RemoveAddress(addrs[0]) + RemoveAddress(addrs[0]) + AddAddress(addrs[0]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[0]) + AddAddress(addrs[1]) + AddAddress(addrs[1]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[0]) + AddAddress(addrs[0]) + + addrUpdates := tcc.AddrUpdates() + require.True(t, len(addrUpdates) > 0, "expected at least one address update") + + expectedActive := len(addrs) - 1 // subtract 1 for the standby + for _, update := range addrUpdates { + require.Equal(t, expectedActive, len(update)) + } +} + +func TestAddressUpdatesRoundRobinPool(t *testing.T) { + // With 3 addresses in the pool, 2 will be active. + addrs := configureBuilderTest(3) + + tcc := &testClientConn{} + lbBuilder.Build(resolver.Target{}, tcc, resolver.BuildOption{}) + + // Simulate some random updates + RemoveAddress(addrs[0]) + RemoveAddress(addrs[0]) + RemoveAddress(addrs[2]) + AddAddress(addrs[0]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[0]) + AddAddress(addrs[2]) + AddAddress(addrs[1]) + AddAddress(addrs[1]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[2]) + RemoveAddress(addrs[1]) + AddAddress(addrs[1]) + RemoveAddress(addrs[2]) + RemoveAddress(addrs[1]) + RemoveAddress(addrs[0]) + AddAddress(addrs[0]) + + addrUpdates := tcc.AddrUpdates() + require.True(t, len(addrUpdates) > 0, "expected at least one address update") + + expectedActive := len(addrs) - 1 // subtract 1 for the standby + for _, update := range addrUpdates { + require.Equal(t, expectedActive, len(update)) + } +} + func TestRemovals(t *testing.T) { okActions := []action{ {add: "foo"}, {add: "bar"}, {add: "qux"}, {remove: "bar"}, + {add: "baz"}, {remove: "foo"}, } + numAddr := 3 + removeDelay := 1 * time.Millisecond + ConfigureBuilder(numAddr, removeDelay) testCases := []struct { desc string actions []action lastFails bool + delay time.Duration }{ { desc: "add then remove", actions: okActions, + delay: 2 * removeDelay, }, { - desc: "remove last address", + desc: "add then remove but too fast", + actions: okActions, + lastFails: true, + delay: 0, + }, + { + desc: "remove one address too many", actions: append(okActions, action{remove: "qux"}), lastFails: true, + delay: 2 * removeDelay, }, { desc: "remove unknown address", actions: []action{ {add: "foo"}, + {add: "qux"}, + {add: "baz"}, {remove: "bar"}, }, lastFails: true, + delay: 2 * removeDelay, + }, + { + // This relies on the implementation detail that the first address added + // to the balancer is the standby. The standby cannot be removed. + desc: "remove standby address", + actions: []action{ + {add: "foo"}, + {add: "qux"}, + {add: "baz"}, + {remove: "foo"}, + }, + lastFails: true, + delay: 2 * removeDelay, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - // This breaks integration with gRPC and causes a monitor goroutine leak. - // Not a problem for this test. - lbBuilder = newBuilder() + lbBuilder.testingRestart <- struct{}{} + time.Sleep(2 * removeDelay) // wait for lastRemoval in monitor goroutine to be long enough ago for i, a := range tc.actions { if a.add != "" { AddAddress(a.add) } else { + if tc.delay > 0 { + time.Sleep(tc.delay) + } + expected := true if i+1 == len(tc.actions) && tc.lastFails { expected = false @@ -65,3 +182,55 @@ type action struct { add string remove string } + +type testClientConn struct { + addrUpdates [][]resolver.Address + configUpdates []string + mu sync.Mutex +} + +func (tcc *testClientConn) NewAddress(addresses []resolver.Address) { + tcc.mu.Lock() + defer tcc.mu.Unlock() + + tcc.addrUpdates = append(tcc.addrUpdates, addresses) +} + +func (tcc *testClientConn) NewServiceConfig(serviceConfig string) { + tcc.mu.Lock() + defer tcc.mu.Unlock() + + tcc.configUpdates = append(tcc.configUpdates, serviceConfig) +} + +func (tcc *testClientConn) AddrUpdates() [][]resolver.Address { + tcc.mu.Lock() + defer tcc.mu.Unlock() + + return tcc.addrUpdates +} + +func (tcc *testClientConn) ConfigUpdates() []string { + tcc.mu.Lock() + defer tcc.mu.Unlock() + + return tcc.configUpdates +} + +// configureBuilderTest reconfigures the global builder and pre-populates +// it with addresses. It returns the list of addresses it added. +func configureBuilderTest(numAddrs int) []string { + delay := 1 * time.Millisecond + ConfigureBuilder(numAddrs, delay) + lbBuilder.testingRestart <- struct{}{} + time.Sleep(2 * delay) + + var addrs []string + for i := 0; i < numAddrs; i++ { + a := fmt.Sprintf("test.%d", i) + AddAddress(a) + addrs = append(addrs, a) + } + + return addrs +} diff --git a/internal/rubyserver/balancer/pool.go b/internal/rubyserver/balancer/pool.go new file mode 100644 index 000000000..f34097990 --- /dev/null +++ b/internal/rubyserver/balancer/pool.go @@ -0,0 +1,59 @@ +package balancer + +func newPool() *pool { + return &pool{active: make(map[string]struct{})} +} + +// pool is a set that keeps one address (element) set aside as a standby. +// This data structure is not thread safe. +type pool struct { + standby string + active map[string]struct{} +} + +// add is idempotent. If there is no standby address yet, addr becomes +// the standby. +func (p *pool) add(addr string) { + if _, ok := p.active[addr]; ok || p.standby == addr { + return + } + + if p.standby == "" { + p.standby = addr + return + } + + p.active[addr] = struct{}{} +} + +func (p *pool) activeSize() int { + return len(p.active) +} + +// remove tries to remove addr from the active addresses. If addr is not +// known or not active, remove returns false. +func (p *pool) remove(addr string) bool { + if _, ok := p.active[addr]; !ok || p.standby == "" { + return false + } + + delete(p.active, addr) + + // Promote the standby to an active address + p.active[p.standby] = struct{}{} + p.standby = "" + + return true +} + +// activeAddrs returns the currently active addresses as a list. The +// order is not deterministic. +func (p *pool) activeAddrs() []string { + var addrs []string + + for a := range p.active { + addrs = append(addrs, a) + } + + return addrs +} diff --git a/internal/rubyserver/rubyserver.go b/internal/rubyserver/rubyserver.go index 8b9f159dc..44cbbd8fc 100644 --- a/internal/rubyserver/rubyserver.go +++ b/internal/rubyserver/rubyserver.go @@ -116,8 +116,11 @@ func Start() (*Server, error) { gitalyRuby := path.Join(cfg.Ruby.Dir, "bin/gitaly-ruby") + numWorkers := cfg.Ruby.NumWorkers + balancer.ConfigureBuilder(numWorkers, 0) + s := &Server{} - for i := 0; i < cfg.Ruby.NumWorkers; i++ { + for i := 0; i < numWorkers; i++ { name := fmt.Sprintf("gitaly-ruby.%d", i) socketPath := socketPath(i) -- cgit v1.2.3