Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <zegerjan@gitlab.com>2018-05-16 16:56:31 +0300
committerZeger-Jan van de Weg <zegerjan@gitlab.com>2018-05-16 16:56:31 +0300
commit0e8cb5ad07960952a94c7eb0ec3167cccd7ae9ec (patch)
treed64175229e45583de9eafed630ba67147d8b63eb
parentc5a643aea441f165b46b40f4eda283d9dec59184 (diff)
parentaff01a0e09974d7019d49753077742b66a3cca9c (diff)
Merge branch 'gitaly-ruby-round-robin' into 'master'
Use round robin load balancing instead of 'pick first' for gitaly-ruby Closes #1167 See merge request gitlab-org/gitaly!700
-rw-r--r--CHANGELOG.md2
-rw-r--r--config.toml.example3
-rw-r--r--doc/configuration/README.md2
-rw-r--r--internal/rubyserver/balancer/balancer.go84
-rw-r--r--internal/rubyserver/balancer/balancer_test.go177
-rw-r--r--internal/rubyserver/balancer/pool.go59
-rw-r--r--internal/rubyserver/rubyserver.go5
-rw-r--r--internal/testhelper/testhelper.go6
8 files changed, 311 insertions, 27 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9967b902a..adbb71c2f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,8 @@
UNRELEASED
+- Use round robin load balancing instead of 'pick first' for gitaly-ruby
+ https://gitlab.com/gitlab-org/gitaly/merge_requests/700
- Unvendor Repository#create implementation
https://gitlab.com/gitlab-org/gitaly/merge_requests/713
- Add gitaly-ruby installation debug log messages
diff --git a/config.toml.example b/config.toml.example
index b47ce94eb..76e0ea047 100644
--- a/config.toml.example
+++ b/config.toml.example
@@ -56,6 +56,9 @@ dir = "/home/git/gitaly/ruby"
#
# # Time that gitaly-ruby memory must remain high before a restart (seconds)
# restart_delay = "5m"
+#
+# # Number of gitaly-ruby worker processes
+# num_workers = 2
[gitlab-shell]
# The directory where gitlab-shell is installed
diff --git a/doc/configuration/README.md b/doc/configuration/README.md
index 88ac51b1e..337e11f87 100644
--- a/doc/configuration/README.md
+++ b/doc/configuration/README.md
@@ -110,7 +110,7 @@ max\_rss limit.
|max_rss|integer|no|Resident set size limit that triggers a gitaly-ruby restart, in bytes. Default 300MB.|
|graceful_restart_timeout|string|no|Grace period to allow a gitaly-ruby process to finish ongoing requests. Default 10 minutes ("10m").|
|restart_delay|string|no|Time memory must be high before a restart is triggered, in seconds. Default 5 minutes ("5m").|
-|num_workers|integer|no|Number of gitaly-ruby worker processes. Default 2, minimum 2.|
+|num_workers|integer|no|Number of gitaly-ruby worker processes. Try increasing this number in case of ResourceExhausted errors. Default 2, minimum 2.|
|linguist_languages_path|string|no|Override for dynamic languages.json discovery. Default: "" (use dynamic discovery).|
### Logging
diff --git a/internal/rubyserver/balancer/balancer.go b/internal/rubyserver/balancer/balancer.go
index bceb53338..4ba136244 100644
--- a/internal/rubyserver/balancer/balancer.go
+++ b/internal/rubyserver/balancer/balancer.go
@@ -21,6 +21,8 @@ package balancer
//
import (
+ "time"
+
"google.golang.org/grpc/resolver"
)
@@ -32,6 +34,11 @@ func init() {
resolver.Register(lbBuilder)
}
+const (
+ // DefaultRemoveDelay is the minimum time between successive address removals.
+ DefaultRemoveDelay = 1 * time.Minute
+)
+
// AddAddress adds the address of a gitaly-ruby instance to the load
// balancer.
func AddAddress(a string) {
@@ -57,10 +64,38 @@ type addressUpdate struct {
next chan struct{}
}
+type config struct {
+ numAddrs int
+ removeDelay time.Duration
+}
+
type builder struct {
addAddress chan string
removeAddress chan addressRemoval
addressUpdates chan addressUpdate
+ configUpdate chan config
+
+ // for testing only
+ testingRestart chan struct{}
+}
+
+// ConfigureBuilder changes the configuration of the global balancer
+// instance. All calls that interact with the balancer will block until
+// ConfigureBuilder has been called at least once.
+func ConfigureBuilder(numAddrs int, removeDelay time.Duration) {
+ cfg := config{
+ numAddrs: numAddrs,
+ removeDelay: removeDelay,
+ }
+
+ if cfg.removeDelay <= 0 {
+ cfg.removeDelay = DefaultRemoveDelay
+ }
+ if numAddrs <= 0 {
+ panic("numAddrs must be at least 1")
+ }
+
+ lbBuilder.configUpdate <- cfg
}
func newBuilder() *builder {
@@ -68,6 +103,8 @@ func newBuilder() *builder {
addAddress: make(chan string),
removeAddress: make(chan addressRemoval),
addressUpdates: make(chan addressUpdate),
+ configUpdate: make(chan config),
+ testingRestart: make(chan struct{}),
}
go b.monitor()
@@ -83,43 +120,60 @@ func (*builder) Scheme() string { return Scheme }
// care what "address" the caller wants to resolve. We always resolve to
// the current list of address for local gitaly-ruby processes.
func (b *builder) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
- // JV: Normally I would delete this but this is very poorly documented,
- // and I don't want to have to look up the magic words again. In case we
- // ever want to do round-robin.
- // cc.NewServiceConfig(`{"LoadBalancingPolicy":"round_robin"}`)
-
+ cc.NewServiceConfig(`{"LoadBalancingPolicy":"round_robin"}`)
return newGitalyResolver(cc, b.addressUpdates), nil
}
// monitor serves address list requests and handles address updates.
func (b *builder) monitor() {
- addresses := make(map[string]struct{})
+ p := newPool()
notify := make(chan struct{})
+ cfg := <-b.configUpdate
+ lastRemoval := time.Now()
+
+ // This channel is intentionally nil so that our 'select' below won't
+ // send messages to it. We do this to prevent sending out invalid (empty)
+ // messages during boot.
+ var addressUpdates chan addressUpdate
for {
au := addressUpdate{next: notify}
- for a := range addresses {
+ for _, a := range p.activeAddrs() {
au.addrs = append(au.addrs, resolver.Address{Addr: a})
}
+ if len(au.addrs) > 0 && addressUpdates == nil {
+ // Start listening for address update requests
+ addressUpdates = b.addressUpdates
+ }
+
select {
- case b.addressUpdates <- au:
- if len(au.addrs) == 0 {
- panic("builder monitor sent empty address update")
- }
+ case addressUpdates <- au:
+ // We have served an address update request
case addr := <-b.addAddress:
- addresses[addr] = struct{}{}
+ p.add(addr)
+
notify = broadcast(notify)
case removal := <-b.removeAddress:
- _, addressKnown := addresses[removal.addr]
- if !addressKnown || len(addresses) <= 1 {
+ if time.Since(lastRemoval) < cfg.removeDelay || p.activeSize() < cfg.numAddrs-1 {
+ removal.ok <- false
+ break
+ }
+
+ if !p.remove(removal.addr) {
removal.ok <- false
break
}
- delete(addresses, removal.addr)
removal.ok <- true
+ lastRemoval = time.Now()
notify = broadcast(notify)
+ case cfg = <-b.configUpdate:
+ // We have received a config update
+ case <-b.testingRestart:
+ go b.monitor()
+ b.configUpdate <- cfg
+ return
}
}
}
diff --git a/internal/rubyserver/balancer/balancer_test.go b/internal/rubyserver/balancer/balancer_test.go
index a4d13493e..637b1d8f2 100644
--- a/internal/rubyserver/balancer/balancer_test.go
+++ b/internal/rubyserver/balancer/balancer_test.go
@@ -1,54 +1,171 @@
package balancer
import (
+ "encoding/json"
+ "fmt"
+ "strings"
+ "sync"
"testing"
+ "time"
"github.com/stretchr/testify/require"
+ "google.golang.org/grpc/resolver"
)
+func TestServiceConfig(t *testing.T) {
+ configureBuilderTest(3)
+
+ tcc := &testClientConn{}
+ lbBuilder.Build(resolver.Target{}, tcc, resolver.BuildOption{})
+
+ configUpdates := tcc.ConfigUpdates()
+ require.Len(t, configUpdates, 1, "expect exactly one config update")
+
+ svcConfig := struct{ LoadBalancingPolicy string }{}
+ require.NoError(t, json.NewDecoder(strings.NewReader(configUpdates[0])).Decode(&svcConfig))
+ require.Equal(t, "round_robin", svcConfig.LoadBalancingPolicy)
+}
+
+func TestAddressUpdatesSmallestPool(t *testing.T) {
+ // The smallest number of addresses is 2: 1 standby, and 1 active.
+ addrs := configureBuilderTest(2)
+
+ tcc := &testClientConn{}
+ lbBuilder.Build(resolver.Target{}, tcc, resolver.BuildOption{})
+
+ // Simulate some random updates
+ RemoveAddress(addrs[0])
+ RemoveAddress(addrs[0])
+ AddAddress(addrs[0])
+ RemoveAddress(addrs[1])
+ RemoveAddress(addrs[0])
+ AddAddress(addrs[1])
+ AddAddress(addrs[1])
+ RemoveAddress(addrs[1])
+ RemoveAddress(addrs[1])
+ RemoveAddress(addrs[1])
+ RemoveAddress(addrs[0])
+ AddAddress(addrs[0])
+
+ addrUpdates := tcc.AddrUpdates()
+ require.True(t, len(addrUpdates) > 0, "expected at least one address update")
+
+ expectedActive := len(addrs) - 1 // subtract 1 for the standby
+ for _, update := range addrUpdates {
+ require.Equal(t, expectedActive, len(update))
+ }
+}
+
+func TestAddressUpdatesRoundRobinPool(t *testing.T) {
+ // With 3 addresses in the pool, 2 will be active.
+ addrs := configureBuilderTest(3)
+
+ tcc := &testClientConn{}
+ lbBuilder.Build(resolver.Target{}, tcc, resolver.BuildOption{})
+
+ // Simulate some random updates
+ RemoveAddress(addrs[0])
+ RemoveAddress(addrs[0])
+ RemoveAddress(addrs[2])
+ AddAddress(addrs[0])
+ RemoveAddress(addrs[1])
+ RemoveAddress(addrs[0])
+ AddAddress(addrs[2])
+ AddAddress(addrs[1])
+ AddAddress(addrs[1])
+ RemoveAddress(addrs[1])
+ RemoveAddress(addrs[2])
+ RemoveAddress(addrs[1])
+ AddAddress(addrs[1])
+ RemoveAddress(addrs[2])
+ RemoveAddress(addrs[1])
+ RemoveAddress(addrs[0])
+ AddAddress(addrs[0])
+
+ addrUpdates := tcc.AddrUpdates()
+ require.True(t, len(addrUpdates) > 0, "expected at least one address update")
+
+ expectedActive := len(addrs) - 1 // subtract 1 for the standby
+ for _, update := range addrUpdates {
+ require.Equal(t, expectedActive, len(update))
+ }
+}
+
func TestRemovals(t *testing.T) {
okActions := []action{
{add: "foo"},
{add: "bar"},
{add: "qux"},
{remove: "bar"},
+ {add: "baz"},
{remove: "foo"},
}
+ numAddr := 3
+ removeDelay := 1 * time.Millisecond
+ ConfigureBuilder(numAddr, removeDelay)
testCases := []struct {
desc string
actions []action
lastFails bool
+ delay time.Duration
}{
{
desc: "add then remove",
actions: okActions,
+ delay: 2 * removeDelay,
},
{
- desc: "remove last address",
+ desc: "add then remove but too fast",
+ actions: okActions,
+ lastFails: true,
+ delay: 0,
+ },
+ {
+ desc: "remove one address too many",
actions: append(okActions, action{remove: "qux"}),
lastFails: true,
+ delay: 2 * removeDelay,
},
{
desc: "remove unknown address",
actions: []action{
{add: "foo"},
+ {add: "qux"},
+ {add: "baz"},
{remove: "bar"},
},
lastFails: true,
+ delay: 2 * removeDelay,
+ },
+ {
+ // This relies on the implementation detail that the first address added
+ // to the balancer is the standby. The standby cannot be removed.
+ desc: "remove standby address",
+ actions: []action{
+ {add: "foo"},
+ {add: "qux"},
+ {add: "baz"},
+ {remove: "foo"},
+ },
+ lastFails: true,
+ delay: 2 * removeDelay,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- // This breaks integration with gRPC and causes a monitor goroutine leak.
- // Not a problem for this test.
- lbBuilder = newBuilder()
+ lbBuilder.testingRestart <- struct{}{}
+ time.Sleep(2 * removeDelay) // wait for lastRemoval in monitor goroutine to be long enough ago
for i, a := range tc.actions {
if a.add != "" {
AddAddress(a.add)
} else {
+ if tc.delay > 0 {
+ time.Sleep(tc.delay)
+ }
+
expected := true
if i+1 == len(tc.actions) && tc.lastFails {
expected = false
@@ -65,3 +182,55 @@ type action struct {
add string
remove string
}
+
+type testClientConn struct {
+ addrUpdates [][]resolver.Address
+ configUpdates []string
+ mu sync.Mutex
+}
+
+func (tcc *testClientConn) NewAddress(addresses []resolver.Address) {
+ tcc.mu.Lock()
+ defer tcc.mu.Unlock()
+
+ tcc.addrUpdates = append(tcc.addrUpdates, addresses)
+}
+
+func (tcc *testClientConn) NewServiceConfig(serviceConfig string) {
+ tcc.mu.Lock()
+ defer tcc.mu.Unlock()
+
+ tcc.configUpdates = append(tcc.configUpdates, serviceConfig)
+}
+
+func (tcc *testClientConn) AddrUpdates() [][]resolver.Address {
+ tcc.mu.Lock()
+ defer tcc.mu.Unlock()
+
+ return tcc.addrUpdates
+}
+
+func (tcc *testClientConn) ConfigUpdates() []string {
+ tcc.mu.Lock()
+ defer tcc.mu.Unlock()
+
+ return tcc.configUpdates
+}
+
+// configureBuilderTest reconfigures the global builder and pre-populates
+// it with addresses. It returns the list of addresses it added.
+func configureBuilderTest(numAddrs int) []string {
+ delay := 1 * time.Millisecond
+ ConfigureBuilder(numAddrs, delay)
+ lbBuilder.testingRestart <- struct{}{}
+ time.Sleep(2 * delay)
+
+ var addrs []string
+ for i := 0; i < numAddrs; i++ {
+ a := fmt.Sprintf("test.%d", i)
+ AddAddress(a)
+ addrs = append(addrs, a)
+ }
+
+ return addrs
+}
diff --git a/internal/rubyserver/balancer/pool.go b/internal/rubyserver/balancer/pool.go
new file mode 100644
index 000000000..f34097990
--- /dev/null
+++ b/internal/rubyserver/balancer/pool.go
@@ -0,0 +1,59 @@
+package balancer
+
+func newPool() *pool {
+ return &pool{active: make(map[string]struct{})}
+}
+
+// pool is a set that keeps one address (element) set aside as a standby.
+// This data structure is not thread safe.
+type pool struct {
+ standby string
+ active map[string]struct{}
+}
+
+// add is idempotent. If there is no standby address yet, addr becomes
+// the standby.
+func (p *pool) add(addr string) {
+ if _, ok := p.active[addr]; ok || p.standby == addr {
+ return
+ }
+
+ if p.standby == "" {
+ p.standby = addr
+ return
+ }
+
+ p.active[addr] = struct{}{}
+}
+
+func (p *pool) activeSize() int {
+ return len(p.active)
+}
+
+// remove tries to remove addr from the active addresses. If addr is not
+// known or not active, remove returns false.
+func (p *pool) remove(addr string) bool {
+ if _, ok := p.active[addr]; !ok || p.standby == "" {
+ return false
+ }
+
+ delete(p.active, addr)
+
+ // Promote the standby to an active address
+ p.active[p.standby] = struct{}{}
+ p.standby = ""
+
+ return true
+}
+
+// activeAddrs returns the currently active addresses as a list. The
+// order is not deterministic.
+func (p *pool) activeAddrs() []string {
+ var addrs []string
+
+ for a := range p.active {
+ addrs = append(addrs, a)
+ }
+
+ return addrs
+}
diff --git a/internal/rubyserver/rubyserver.go b/internal/rubyserver/rubyserver.go
index 8b9f159dc..44cbbd8fc 100644
--- a/internal/rubyserver/rubyserver.go
+++ b/internal/rubyserver/rubyserver.go
@@ -116,8 +116,11 @@ func Start() (*Server, error) {
gitalyRuby := path.Join(cfg.Ruby.Dir, "bin/gitaly-ruby")
+ numWorkers := cfg.Ruby.NumWorkers
+ balancer.ConfigureBuilder(numWorkers, 0)
+
s := &Server{}
- for i := 0; i < cfg.Ruby.NumWorkers; i++ {
+ for i := 0; i < numWorkers; i++ {
name := fmt.Sprintf("gitaly-ruby.%d", i)
socketPath := socketPath(i)
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index 10dd4507e..892c11b78 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -259,12 +259,6 @@ func ConfigureRuby() {
if err := config.ConfigureRuby(); err != nil {
log.Fatal("validate ruby config: %v", err)
}
-
- // This speeds up test boot-up. The reason we use more than 1 worker in
- // production is to handle memory leaks, but the tests don't run for long
- // enough to hit those memory leaks anyway. Most tests also run faster
- // than the minimum delay before a ruby worker failover can even happen.
- config.Config.Ruby.NumWorkers = 1
}
// NewTestGrpcServer creates a GRPC Server for testing purposes