Welcome to mirror list, hosted at ThFree Co, Russian Federation.

balancer.go « balancer « rubyserver « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 4ba136244fa5bce79928ff1d7faf5d5f21a526d3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package balancer

// In this package we manage a global pool of addresses for gitaly-ruby,
// accessed via the gitaly-ruby:// scheme. The interface consists of the
// AddAddress and RemoveAddress methods. RemoveAddress returns a boolean
// indicating whether the address was removed; this is intended to give
// back-pressure against repeated process restarts.
//
// The gitaly-ruby:// scheme exists because that is the way we can
// interact with the internal client-side loadbalancer of grpc-go. A URL
// for this scheme would be gitaly-ruby://foobar. For gitaly-ruby://
// URL's, the host and port are ignored. So gitaly-ruby://foobar is
// actually a working, valid address.
//
// Strictly speaking this package implements a gRPC 'Resolver'. This
// resolver feeds address list updates to a gRPC 'balancer' which
// interacts with the gRPC client connection machinery. A resolver
// consists of a Builder which returns Resolver instances. Our Builder
// manages the address pool and notifies its Resolver instances of
// changes, which they then propagate into the gRPC library.
//

import (
	"time"

	"google.golang.org/grpc/resolver"
)

var (
	lbBuilder = newBuilder()
)

func init() {
	resolver.Register(lbBuilder)
}

const (
	// DefaultRemoveDelay is the minimum time between successive address removals.
	DefaultRemoveDelay = 1 * time.Minute
)

// AddAddress adds the address of a gitaly-ruby instance to the load
// balancer.
func AddAddress(a string) {
	lbBuilder.addAddress <- a
}

// RemoveAddress removes the address of a gitaly-ruby instance from the
// load balancer. Returns false if the pool is too small to remove the
// address.
func RemoveAddress(addr string) bool {
	ok := make(chan bool)
	lbBuilder.removeAddress <- addressRemoval{ok: ok, addr: addr}
	return <-ok
}

type addressRemoval struct {
	addr string
	ok   chan<- bool
}

type addressUpdate struct {
	addrs []resolver.Address
	next  chan struct{}
}

type config struct {
	numAddrs    int
	removeDelay time.Duration
}

type builder struct {
	addAddress     chan string
	removeAddress  chan addressRemoval
	addressUpdates chan addressUpdate
	configUpdate   chan config

	// for testing only
	testingRestart chan struct{}
}

// ConfigureBuilder changes the configuration of the global balancer
// instance. All calls that interact with the balancer will block until
// ConfigureBuilder has been called at least once.
func ConfigureBuilder(numAddrs int, removeDelay time.Duration) {
	cfg := config{
		numAddrs:    numAddrs,
		removeDelay: removeDelay,
	}

	if cfg.removeDelay <= 0 {
		cfg.removeDelay = DefaultRemoveDelay
	}
	if numAddrs <= 0 {
		panic("numAddrs must be at least 1")
	}

	lbBuilder.configUpdate <- cfg
}

func newBuilder() *builder {
	b := &builder{
		addAddress:     make(chan string),
		removeAddress:  make(chan addressRemoval),
		addressUpdates: make(chan addressUpdate),
		configUpdate:   make(chan config),
		testingRestart: make(chan struct{}),
	}
	go b.monitor()

	return b
}

// Scheme is the name of the address scheme that makes gRPC select this resolver.
const Scheme = "gitaly-ruby"

func (*builder) Scheme() string { return Scheme }

// Build ignores its resolver.Target argument. That means it does not
// care what "address" the caller wants to resolve. We always resolve to
// the current list of address for local gitaly-ruby processes.
func (b *builder) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
	cc.NewServiceConfig(`{"LoadBalancingPolicy":"round_robin"}`)
	return newGitalyResolver(cc, b.addressUpdates), nil
}

// monitor serves address list requests and handles address updates.
func (b *builder) monitor() {
	p := newPool()
	notify := make(chan struct{})
	cfg := <-b.configUpdate
	lastRemoval := time.Now()

	// This channel is intentionally nil so that our 'select' below won't
	// send messages to it. We do this to prevent sending out invalid (empty)
	// messages during boot.
	var addressUpdates chan addressUpdate

	for {
		au := addressUpdate{next: notify}
		for _, a := range p.activeAddrs() {
			au.addrs = append(au.addrs, resolver.Address{Addr: a})
		}

		if len(au.addrs) > 0 && addressUpdates == nil {
			// Start listening for address update requests
			addressUpdates = b.addressUpdates
		}

		select {
		case addressUpdates <- au:
			// We have served an address update request
		case addr := <-b.addAddress:
			p.add(addr)

			notify = broadcast(notify)
		case removal := <-b.removeAddress:
			if time.Since(lastRemoval) < cfg.removeDelay || p.activeSize() < cfg.numAddrs-1 {
				removal.ok <- false
				break
			}

			if !p.remove(removal.addr) {
				removal.ok <- false
				break
			}

			removal.ok <- true
			lastRemoval = time.Now()
			notify = broadcast(notify)
		case cfg = <-b.configUpdate:
			// We have received a config update
		case <-b.testingRestart:
			go b.monitor()
			b.configUpdate <- cfg
			return
		}
	}
}

// broadcast returns a fresh channel because we can only close them once
func broadcast(ch chan struct{}) chan struct{} {
	close(ch)
	return make(chan struct{})
}

// gitalyResolver propagates address list updates to a
// resolver.ClientConn instance
type gitalyResolver struct {
	clientConn resolver.ClientConn

	started        chan struct{}
	done           chan struct{}
	resolveNow     chan struct{}
	addressUpdates chan addressUpdate
}

func newGitalyResolver(cc resolver.ClientConn, auCh chan addressUpdate) *gitalyResolver {
	r := &gitalyResolver{
		started:        make(chan struct{}),
		done:           make(chan struct{}),
		resolveNow:     make(chan struct{}),
		addressUpdates: auCh,
		clientConn:     cc,
	}
	go r.monitor()

	// Don't return until we have sent at least one address update. This is
	// meant to avoid panics inside the grpc-go library.
	<-r.started

	return r
}

func (r *gitalyResolver) ResolveNow(resolver.ResolveNowOption) {
	r.resolveNow <- struct{}{}
}

func (r *gitalyResolver) Close() {
	close(r.done)
}

func (r *gitalyResolver) monitor() {
	notify := r.sendUpdate()
	close(r.started)

	for {
		select {
		case <-notify:
			notify = r.sendUpdate()
		case <-r.resolveNow:
			notify = r.sendUpdate()
		case <-r.done:
			return
		}
	}
}

func (r *gitalyResolver) sendUpdate() chan struct{} {
	au := <-r.addressUpdates
	r.clientConn.NewAddress(au.addrs)
	return au.next
}