Welcome to mirror list, hosted at ThFree Co, Russian Federation.

checks.go « service « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 75ac9055d935576b7d59835dbd3994097690714d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package service

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"io"
	"os"
	"time"

	migrate "github.com/rubenv/sql-migrate"
	gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
	"gitlab.com/gitlab-org/gitaly/v15/client"
	"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
	"gitlab.com/gitlab-org/gitaly/v15/internal/helper/env"
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/migrations"
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
	"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
	"gitlab.com/gitlab-org/labkit/correlation"
	"golang.org/x/sync/errgroup"
	"google.golang.org/grpc"
	"google.golang.org/protobuf/types/known/durationpb"
)

// Severity is a type that indicates the severity of a check
type Severity string

const (
	// Warning indicates a severity level of warning
	Warning Severity = "warning"
	// Fatal indicates a severity level of fatal
	// any checks that are Fatal will prevent Praefect from starting up
	Fatal = "fatal"
)

// Check is a struct representing a check on the health of a Gitaly cluster's setup. These are separate from the "healthcheck"
// concept which is more concerned with the health of the praefect service. These checks are meant to diagnose any issues with
// the praefect cluster setup itself and will be run on startup/restarts.
type Check struct {
	Run         func(ctx context.Context) error
	Name        string
	Description string
	Severity    Severity
}

// CheckFunc is a function type that takes a praefect config and returns a Check
type CheckFunc func(conf config.Config, w io.Writer, quiet bool) *Check

// AllChecks returns slice of all checks that can be executed for praefect.
func AllChecks() []CheckFunc {
	return []CheckFunc{
		NewPraefectMigrationCheck,
		NewGitalyNodeConnectivityCheck,
		NewPostgresReadWriteCheck,
		NewUnavailableReposCheck,
		NewClockSyncCheck(helper.CheckClockSync),
	}
}

// NewPraefectMigrationCheck returns a Check that checks if all praefect migrations have run
func NewPraefectMigrationCheck(conf config.Config, w io.Writer, quiet bool) *Check {
	return &Check{
		Name:        "praefect migrations",
		Description: "confirms whether or not all praefect migrations have run",
		Run: func(ctx context.Context) error {
			openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
			defer cancel()
			db, err := glsql.OpenDB(openDBCtx, conf.DB)
			if err != nil {
				return err
			}
			defer db.Close()

			migrationSet := migrate.MigrationSet{
				TableName: migrations.MigrationTableName,
			}

			migrationSource := &migrate.MemoryMigrationSource{
				Migrations: migrations.All(),
			}

			migrationsNeeded, _, err := migrationSet.PlanMigration(db, "postgres", migrationSource, migrate.Up, 0)
			if err != nil {
				return err
			}

			missingMigrations := len(migrationsNeeded)

			if missingMigrations > 0 {
				return fmt.Errorf("%d migrations have not been run", missingMigrations)
			}

			return nil
		},
		Severity: Fatal,
	}
}

// NewGitalyNodeConnectivityCheck returns a check that ensures Praefect can talk to all nodes of all virtual storages
func NewGitalyNodeConnectivityCheck(conf config.Config, w io.Writer, quiet bool) *Check {
	return &Check{
		Name: "gitaly node connectivity & disk access",
		Description: "confirms if praefect can reach all of its gitaly nodes, and " +
			"whether or not the gitaly nodes can read/write from and to its storages.",
		Run: func(ctx context.Context) error {
			return nodes.PingAll(ctx, conf, nodes.NewTextPrinter(w), quiet)
		},
		Severity: Fatal,
	}
}

// NewPostgresReadWriteCheck returns a check that ensures Praefect can read and write to the database
func NewPostgresReadWriteCheck(conf config.Config, w io.Writer, quiet bool) *Check {
	return &Check{
		Name:        "database read/write",
		Description: "checks if praefect can write/read to and from the database",
		Run: func(ctx context.Context) error {
			db, err := glsql.OpenDB(ctx, conf.DB)
			if err != nil {
				return fmt.Errorf("error opening database connection: %w", err)
			}
			defer db.Close()

			tx, err := db.BeginTx(ctx, nil)
			if err != nil {
				return fmt.Errorf("error starting transaction: %w", err)
			}
			//nolint: errcheck
			defer tx.Rollback()

			var id int
			if err = tx.QueryRowContext(ctx, "SELECT id FROM hello_world").Scan(&id); err != nil {
				if !errors.Is(err, sql.ErrNoRows) {
					return fmt.Errorf("error reading from table: %w", err)
				}
			}

			logMessage(quiet, w, "successfully read from database")

			res, err := tx.ExecContext(ctx, "INSERT INTO hello_world (id) VALUES(1)")
			if err != nil {
				return fmt.Errorf("error writing to table: %w", err)
			}

			rows, err := res.RowsAffected()
			if err != nil {
				return err
			}

			if rows != 1 {
				return errors.New("failed to insert row")
			}
			logMessage(quiet, w, "successfully wrote to database")

			return nil
		},
		Severity: Fatal,
	}
}

// NewUnavailableReposCheck returns a check that finds the number of repositories without a valid primary
func NewUnavailableReposCheck(conf config.Config, w io.Writer, quiet bool) *Check {
	return &Check{
		Name:        "unavailable repositories",
		Description: "lists repositories that are missing a valid primary, hence rendering them unavailable",
		Run: func(ctx context.Context) error {
			db, err := glsql.OpenDB(ctx, conf.DB)
			if err != nil {
				return fmt.Errorf("error opening database connection: %w", err)
			}
			defer db.Close()

			unavailableRepositories, err := datastore.CountUnavailableRepositories(
				ctx,
				db,
				conf.VirtualStorageNames(),
			)
			if err != nil {
				return err
			}

			if len(unavailableRepositories) == 0 {
				logMessage(quiet, w, "All repositories are available.")
				return nil
			}

			for virtualStorage, unavailableCount := range unavailableRepositories {
				format := "virtual-storage %q has %d repositories that are unavailable."
				if unavailableCount == 1 {
					format = "virtual-storage %q has %d repository that is unavailable."
				}
				logMessage(quiet, w, format, virtualStorage, unavailableCount)
			}

			return errors.New("repositories unavailable")
		},
		Severity: Warning,
	}
}

// NewClockSyncCheck returns a function that returns a check that verifies if system clock is in sync.
func NewClockSyncCheck(clockDriftCheck func(ntpHost string, driftThreshold time.Duration) (bool, error)) func(_ config.Config, _ io.Writer, _ bool) *Check {
	return func(conf config.Config, w io.Writer, quite bool) *Check {
		return &Check{
			Name: "clock synchronization",
			Description: "checks if system clock is in sync with NTP service. " +
				"You can use NTP_HOST env var to provide NTP service URL to query and " +
				"DRIFT_THRESHOLD to provide allowed drift as a duration (1ms, 20sec, etc.)",
			Severity: Fatal,
			Run: func(ctx context.Context) error {
				const driftThresholdMillisEnvName = "DRIFT_THRESHOLD"
				driftThreshold, err := env.GetDuration(driftThresholdMillisEnvName, time.Minute)
				if err != nil {
					return fmt.Errorf("env var %s expected to be an duration (5s, 100ms, etc.)", driftThresholdMillisEnvName)
				}
				// If user specify 10ns it would be converted into 0ms, we should prevent
				// this situation and exit with detailed error.
				if driftThreshold != 0 && driftThreshold.Milliseconds() == 0 {
					return fmt.Errorf("env var %s expected to be 0 or at least 1ms", driftThresholdMillisEnvName)
				}

				ntpHost := os.Getenv("NTP_HOST")
				correlationID := correlation.SafeRandomID()
				ctx = correlation.ContextWithCorrelation(ctx, correlationID)

				logMessage(quite, w, "checking with NTP service at %s and allowed clock drift %d ms [correlation_id: %s]", ntpHost, driftThreshold.Milliseconds(), correlationID)

				g, ctx := errgroup.WithContext(ctx)
				g.Go(func() error {
					synced, err := clockDriftCheck(ntpHost, driftThreshold)
					if err != nil {
						return fmt.Errorf("praefect: %w", err)
					}
					if !synced {
						return errors.New("praefect: clock is not synced")
					}
					return nil
				})

				for i := range conf.VirtualStorages {
					for j := range conf.VirtualStorages[i].Nodes {
						node := conf.VirtualStorages[i].Nodes[j]
						g.Go(func() error {
							opts := []grpc.DialOption{grpc.WithBlock()}
							if len(node.Token) > 0 {
								opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)))
							}

							cc, err := client.DialContext(ctx, node.Address, opts)
							if err != nil {
								return fmt.Errorf("%s machine: %w", node.Address, err)
							}
							defer func() { _ = cc.Close() }()

							serverServiceClient := gitalypb.NewServerServiceClient(cc)
							resp, err := serverServiceClient.ClockSynced(ctx, &gitalypb.ClockSyncedRequest{
								NtpHost: ntpHost, DriftThreshold: durationpb.New(driftThreshold),
							})
							if err != nil {
								return fmt.Errorf("gitaly node at %s: %w", node.Address, err)
							}
							if !resp.GetSynced() {
								return fmt.Errorf("gitaly node at %s: clock is not synced", node.Address)
							}
							return nil
						})
					}
				}
				return g.Wait()
			},
		}
	}
}

func logMessage(quiet bool, w io.Writer, format string, a ...interface{}) {
	if quiet {
		return
	}
	fmt.Fprintf(w, format+"\n", a...)
}