Welcome to mirror list, hosted at ThFree Co, Russian Federation.

worker.go « rubyserver « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: a06e096182fe878a2045cb3d8fcc2ee52d098d45 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package rubyserver

import (
	"fmt"
	"syscall"
	"time"

	"gitlab.com/gitlab-org/gitaly/internal/config"
	"gitlab.com/gitlab-org/gitaly/internal/rubyserver/balancer"
	"gitlab.com/gitlab-org/gitaly/internal/supervisor"

	"github.com/prometheus/client_golang/prometheus"
	log "github.com/sirupsen/logrus"
)

var (
	terminationCounter = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "gitaly_ruby_memory_terminations_total",
			Help: "Number of times gitaly-ruby has been terminated because of excessive memory use.",
		},
		[]string{"name"},
	)
)

func init() {
	prometheus.MustRegister(terminationCounter)
}

// worker observes the event stream of a supervised process and restarts
// it if necessary, in cooperation with the balancer.
type worker struct {
	*supervisor.Process
	address string
	events  <-chan supervisor.Event

	// This is for testing only, so that we can inject a fake balancer
	balancerUpdate chan balancerProxy
}

func newWorker(p *supervisor.Process, address string, events <-chan supervisor.Event) *worker {
	w := &worker{
		Process:        p,
		address:        address,
		events:         events,
		balancerUpdate: make(chan balancerProxy),
	}
	go w.monitor()

	bal := defaultBalancer{}
	w.balancerUpdate <- bal

	// When we return from this function, requests may start coming in. If
	// there are no addresses in the balancer when the first request comes in
	// we can get a panic from grpc-go. So before returning, we ensure the
	// current address has been added to the balancer.
	bal.AddAddress(w.address)

	return w
}

type balancerProxy interface {
	AddAddress(string)
	RemoveAddress(string) bool
}

type defaultBalancer struct{}

func (defaultBalancer) AddAddress(s string)         { balancer.AddAddress(s) }
func (defaultBalancer) RemoveAddress(s string) bool { return balancer.RemoveAddress(s) }

func (w *worker) monitor() {
	sw := &stopwatch{}
	currentPid := 0
	bal := <-w.balancerUpdate

	for {
	nextEvent:
		select {
		case e := <-w.events:
			if e.Pid <= 0 {
				log.WithFields(log.Fields{
					"worker.name":      w.Name,
					"worker.event_pid": e.Pid,
				}).Info("received invalid PID")
				break nextEvent
			}

			switch e.Type {
			case supervisor.Up:
				if e.Pid == currentPid {
					// Ignore repeated events to avoid constantly resetting our internal
					// state.
					break nextEvent
				}

				bal.AddAddress(w.address)
				currentPid = e.Pid
				sw.reset()
			case supervisor.MemoryHigh:
				if e.Pid != currentPid {
					break nextEvent
				}

				sw.mark()
				if sw.elapsed() <= config.Config.Ruby.RestartDelay {
					break nextEvent
				}

				// It is crucial to check the return value of RemoveAddress. If we don't
				// we may leave the system without the capacity to make gitaly-ruby
				// requests.
				if bal.RemoveAddress(w.address) {
					go w.waitTerminate(e.Pid)
					sw.reset()
				}
			case supervisor.MemoryLow:
				if e.Pid != currentPid {
					break nextEvent
				}

				sw.reset()
			default:
				panic(fmt.Sprintf("unknown state %v", e.Type))
			}
		case bal = <-w.balancerUpdate:
			// For testing only.
		}
	}
}

func (w *worker) waitTerminate(pid int) {
	// Wait for in-flight requests to reach the worker before we slam the
	// door in their face.
	time.Sleep(1 * time.Minute)

	terminationCounter.WithLabelValues(w.Name).Inc()

	log.WithFields(log.Fields{
		"worker.name": w.Name,
		"worker.pid":  pid,
	}).Info("sending SIGTERM")
	syscall.Kill(pid, syscall.SIGTERM)

	time.Sleep(config.Config.Ruby.GracefulRestartTimeout)

	log.WithFields(log.Fields{
		"worker.name": w.Name,
		"worker.pid":  pid,
	}).Info("sending SIGKILL")
	syscall.Kill(pid, syscall.SIGKILL)
}