Welcome to mirror list, hosted at ThFree Co, Russian Federation.

bootstrap.go « bootstrap « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 894139f235d8cbdd20664c9e279de0ba44a96f01 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package bootstrap

import (
	"fmt"
	"net"
	"os"
	"os/signal"
	"syscall"

	"github.com/cloudflare/tableflip"
	"github.com/prometheus/client_golang/prometheus"
	log "github.com/sirupsen/logrus"
	"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
	"gitlab.com/gitlab-org/gitaly/v14/internal/helper/env"
	"golang.org/x/sys/unix"
)

const (
	// EnvPidFile is the name of the environment variable containing the pid file path
	EnvPidFile = "GITALY_PID_FILE"
	// EnvUpgradesEnabled is an environment variable that when defined gitaly must enable graceful upgrades on SIGHUP
	EnvUpgradesEnabled     = "GITALY_UPGRADES_ENABLED"
	socketReusePortWarning = "Unable to set SO_REUSEPORT: zero downtime upgrades will not work"
)

// Listener is an interface of the bootstrap manager.
type Listener interface {
	// RegisterStarter adds starter to the pool.
	RegisterStarter(starter Starter)
	// Start starts all registered starters to accept connections.
	Start() error
	// Wait terminates all registered starters.
	Wait(gracePeriodTicker helper.Ticker, stopAction func()) error
}

// Bootstrap handles graceful upgrades
type Bootstrap struct {
	upgrader   upgrader
	listenFunc ListenFunc
	errChan    chan error
	starters   []Starter
	connTotal  *prometheus.CounterVec
}

type upgrader interface {
	Exit() <-chan struct{}
	HasParent() bool
	Ready() error
	Upgrade() error
	Stop()
}

// New performs tableflip initialization
//  pidFile is optional, if provided it will always contain the current process PID
//  upgradesEnabled controls the upgrade process on SIGHUP signal
//
// first boot:
// * gitaly starts as usual, we will refer to it as p1
// * New will build a tableflip.Upgrader, we will refer to it as upg
// * sockets and files must be opened with upg.Fds
// * p1 will trap SIGHUP and invoke upg.Upgrade()
// * when ready to accept incoming connections p1 will call upg.Ready()
// * upg.Exit() channel will be closed when an upgrades completed successfully and the process must terminate
//
// graceful upgrade:
// * user replaces gitaly binary and/or config file
// * user sends SIGHUP to p1
// * p1 will fork and exec the new gitaly, we will refer to it as p2
// * from now on p1 will ignore other SIGHUP
// * if p2 terminates with a non-zero exit code, SIGHUP handling will be restored
// * p2 will follow the "first boot" sequence but upg.Fds will provide sockets and files from p1, when available
// * when p2 invokes upg.Ready() all the shared file descriptors not claimed by p2 will be closed
// * upg.Exit() channel in p1 will be closed now and p1 can gracefully terminate already accepted connections
// * upgrades cannot starts again if p1 and p2 are both running, an hard termination should be scheduled to overcome
//   freezes during a graceful shutdown
// gitaly-wrapper is supposed to set EnvUpgradesEnabled in order to enable graceful upgrades
func New(totalConn *prometheus.CounterVec) (*Bootstrap, error) {
	pidFile := os.Getenv(EnvPidFile)
	upgradesEnabled, _ := env.GetBool(EnvUpgradesEnabled, false)

	// PIDFile is optional, if provided tableflip will keep it updated
	upg, err := tableflip.New(tableflip.Options{
		PIDFile: pidFile,
		ListenConfig: &net.ListenConfig{
			Control: func(network, address string, c syscall.RawConn) error {
				var opErr error
				err := c.Control(func(fd uintptr) {
					opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
				})
				if err != nil {
					log.WithError(err).Warn(socketReusePortWarning)
				}
				if opErr != nil {
					log.WithError(opErr).Warn(socketReusePortWarning)
				}
				return nil
			},
		},
	})
	if err != nil {
		return nil, err
	}

	return _new(upg, upg.Fds.Listen, upgradesEnabled, totalConn)
}

func _new(upg upgrader, listenFunc ListenFunc, upgradesEnabled bool, totalConn *prometheus.CounterVec) (*Bootstrap, error) {
	if upgradesEnabled {
		go func() {
			sig := make(chan os.Signal, 1)
			signal.Notify(sig, syscall.SIGHUP)

			for range sig {
				err := upg.Upgrade()
				if err != nil {
					log.WithError(err).Error("Upgrade failed")
					continue
				}

				log.Info("Upgrade succeeded")
			}
		}()
	}

	return &Bootstrap{
		upgrader:   upg,
		listenFunc: listenFunc,
		connTotal:  totalConn,
	}, nil
}

// ListenFunc is a net.Listener factory
type ListenFunc func(net, addr string) (net.Listener, error)

// Starter is function to initialize a net.Listener
// it receives a ListenFunc to be used for net.Listener creation and a chan<- error to signal runtime errors
// It must serve incoming connections asynchronously and signal errors on the channel
// the return value is for setup errors
type Starter func(ListenFunc, chan<- error, *prometheus.CounterVec) error

func (b *Bootstrap) isFirstBoot() bool { return !b.upgrader.HasParent() }

// RegisterStarter adds a new starter
func (b *Bootstrap) RegisterStarter(starter Starter) {
	b.starters = append(b.starters, starter)
}

// Start will invoke all the registered starters and wait asynchronously for runtime errors
// in case a Starter fails then the error is returned and the function is aborted
func (b *Bootstrap) Start() error {
	b.errChan = make(chan error, len(b.starters))

	for _, start := range b.starters {
		if err := start(b.listen, b.errChan, b.connTotal); err != nil {
			return err
		}
	}

	return nil
}

// Wait will signal process readiness to the parent and than wait for an exit condition
// SIGTERM, SIGINT and a runtime error will trigger an immediate shutdown
// in case of an upgrade there will be a grace period to complete the ongoing requests
// stopAction will be invoked during a graceful stop. It must wait until the shutdown is completed.
func (b *Bootstrap) Wait(gracePeriodTicker helper.Ticker, stopAction func()) error {
	signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT}
	immediateShutdown := make(chan os.Signal, len(signals))
	signal.Notify(immediateShutdown, signals...)

	if err := b.upgrader.Ready(); err != nil {
		return err
	}

	var err error
	select {
	case <-b.upgrader.Exit():
		// this is the old process and a graceful upgrade is in progress
		// the new process signaled its readiness and we started a graceful stop
		// however no further upgrades can be started until this process is running
		// we set a grace period and then we force a termination.
		waitError := b.waitGracePeriod(gracePeriodTicker, immediateShutdown, stopAction)

		err = fmt.Errorf("graceful upgrade: %v", waitError)
	case s := <-immediateShutdown:
		err = fmt.Errorf("received signal %q", s)
		b.upgrader.Stop()
	case err = <-b.errChan:
	}

	return err
}

func (b *Bootstrap) waitGracePeriod(gracePeriodTicker helper.Ticker, kill <-chan os.Signal, stopAction func()) error {
	log.Warn("starting grace period")

	allServersDone := make(chan struct{})
	go func() {
		if stopAction != nil {
			stopAction()
		}
		close(allServersDone)
	}()

	gracePeriodTicker.Reset()

	select {
	case <-gracePeriodTicker.C():
		return fmt.Errorf("grace period expired")
	case <-kill:
		return fmt.Errorf("force shutdown")
	case <-allServersDone:
		return fmt.Errorf("completed")
	}
}

func (b *Bootstrap) listen(network, path string) (net.Listener, error) {
	if network == "unix" && b.isFirstBoot() {
		if err := os.RemoveAll(path); err != nil {
			return nil, err
		}
	}

	return b.listenFunc(network, path)
}

// Noop is a bootstrapper that does no additional configurations.
type Noop struct {
	starters  []Starter
	shutdown  chan struct{}
	errChan   chan error
	connTotal *prometheus.CounterVec
}

// NewNoop returns initialized instance of the *Noop.
func NewNoop(connTotal *prometheus.CounterVec) *Noop {
	return &Noop{shutdown: make(chan struct{}), connTotal: connTotal}
}

// RegisterStarter adds starter to the pool.
func (n *Noop) RegisterStarter(starter Starter) {
	n.starters = append(n.starters, starter)
}

// Start starts all registered starters to accept connections.
func (n *Noop) Start() error {
	n.errChan = make(chan error, len(n.starters))

	for _, start := range n.starters {
		if err := start(
			net.Listen,
			n.errChan,
			n.connTotal,
		); err != nil {
			return err
		}
	}
	return nil
}

// Wait terminates all registered starters.
func (n *Noop) Wait(_ helper.Ticker, stopAction func()) error {
	select {
	case <-n.shutdown:
		if stopAction != nil {
			stopAction()
		}
	case err := <-n.errChan:
		return err
	}

	return nil
}

// Terminate unblocks Wait method and executes stopAction call-back passed into it.
func (n *Noop) Terminate() {
	close(n.shutdown)
}