Welcome to mirror list, hosted at ThFree Co, Russian Federation.

db.go « testdb « testhelper « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: f085ebc582ad9d42eb81fa1527d7b3adde096e90 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
package testdb

import (
	"context"
	"database/sql"
	"errors"
	"net"
	"os"
	"os/exec"
	"strconv"
	"strings"
	"sync"
	"testing"
	"time"

	"github.com/google/uuid"
	migrate "github.com/rubenv/sql-migrate"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/migrations"
	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)

const (
	advisoryLockIDDatabaseTemplate = 1627644550
	praefectTemplateDatabase       = "praefect_template"
)

// TxWrapper is a simple wrapper around *sql.Tx.
type TxWrapper struct {
	*sql.Tx
}

// Rollback executes Rollback operation on the wrapped *sql.Tx if it is set.
// After execution is sets Tx to nil to prevent errors on the repeated invocations (useful
// for testing when Rollback is deferred).
func (txw *TxWrapper) Rollback(t testing.TB) {
	t.Helper()
	if txw.Tx != nil {
		require.NoError(t, txw.Tx.Rollback())
		txw.Tx = nil
	}
}

// Commit executes Commit operation on the wrapped *sql.Tx if it is set.
// After execution is sets Tx to nil to prevent errors on the deferred invocations (useful
// for testing when Rollback is deferred).
func (txw *TxWrapper) Commit(t testing.TB) {
	t.Helper()
	if txw.Tx != nil {
		require.NoError(t, txw.Tx.Commit())
		txw.Tx = nil
	}
}

// DB is a helper struct that should be used only for testing purposes.
type DB struct {
	*sql.DB
	// Name is a name of the database.
	Name string
}

// Begin starts a new transaction and returns it wrapped into TxWrapper.
func (db DB) Begin(t testing.TB) *TxWrapper {
	t.Helper()
	tx, err := db.DB.Begin()
	require.NoError(t, err)
	return &TxWrapper{Tx: tx}
}

// Truncate removes all data from the list of tables and restarts identities for them.
func (db DB) Truncate(t testing.TB, tables ...string) {
	t.Helper()

	for _, table := range tables {
		_, err := db.DB.Exec("DELETE FROM " + table)
		require.NoError(t, err, "database cleanup failed: %s", tables)
	}

	_, err := db.DB.Exec("SELECT setval(relname::TEXT, 1, false) from pg_class where relkind = 'S'")
	require.NoError(t, err, "database cleanup failed: %s", tables)
}

// RequireRowsInTable verifies that `tname` table has `n` amount of rows in it.
func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) {
	t.Helper()

	var count int
	require.NoError(t, db.QueryRow("SELECT COUNT(*) FROM "+tname).Scan(&count))
	require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n)
}

// TruncateAll removes all data from known set of tables.
func (db DB) TruncateAll(t testing.TB) {
	db.Truncate(t,
		"replication_queue_job_lock",
		"replication_queue",
		"replication_queue_lock",
		"node_status",
		"shard_primaries",
		"storage_repositories",
		"repositories",
		"virtual_storages",
		"repository_assignments",
		"storage_cleanups",
	)
}

// MustExec executes `q` with `args` and verifies there are no errors.
func (db DB) MustExec(t testing.TB, q string, args ...interface{}) {
	_, err := db.DB.Exec(q, args...)
	require.NoError(t, err)
}

// Close removes schema if it was used and releases connection pool.
func (db DB) Close() error {
	if err := db.DB.Close(); err != nil {
		return errors.New("failed to release connection pool: " + err.Error())
	}
	return nil
}

// New returns a wrapper around the database connection pool.
// Must be used only for testing.
// The new database with empty relations will be created for each call of this function.
// It uses env vars:
//   PGHOST - required, URL/socket/dir
//   PGPORT - required, binding port
//   PGUSER - optional, user - `$ whoami` would be used if not provided
// Once the test is completed the database will be dropped on test cleanup execution.
func New(t testing.TB) DB {
	t.Helper()
	database := "praefect_" + strings.ReplaceAll(uuid.New().String(), "-", "")
	return DB{DB: initPraefectDB(t, database), Name: database}
}

// GetConfig returns the database configuration determined by
// environment variables.  See NewDB() for the list of variables.
func GetConfig(t testing.TB, database string) config.DB {
	env := getDatabaseEnvironment(t)

	require.Contains(t, env, "PGHOST", "PGHOST env var expected to be provided to connect to Postgres database")
	require.Contains(t, env, "PGPORT", "PGHOST env var expected to be provided to connect to Postgres database")

	portNumber, err := strconv.Atoi(env["PGPORT"])
	require.NoError(t, err, "PGPORT must be a port number of the Postgres database listens for incoming connections")

	// connect to 'postgres' database first to re-create testing database from scratch
	conf := config.DB{
		Host:    env["PGHOST"],
		Port:    portNumber,
		DBName:  database,
		SSLMode: "disable",
		User:    env["PGUSER"],
		SessionPooled: config.DBConnection{
			Host: env["PGHOST"],
			Port: portNumber,
		},
	}

	if bouncerHost, ok := env["PGHOST_PGBOUNCER"]; ok {
		conf.Host = bouncerHost
	}

	if bouncerPort, ok := env["PGPORT_PGBOUNCER"]; ok {
		bouncerPortNumber, err := strconv.Atoi(bouncerPort)
		require.NoError(t, err, "PGPORT_PGBOUNCER must be a port number of the PgBouncer")
		conf.Port = bouncerPortNumber
	}

	return conf
}

func requireSQLOpen(t testing.TB, dbCfg config.DB, direct bool) *sql.DB {
	t.Helper()
	db, err := sql.Open("pgx", glsql.DSN(dbCfg, direct))
	require.NoErrorf(t, err, "failed to connect to %q database", dbCfg.DBName)
	if !assert.NoErrorf(t, db.Ping(), "failed to communicate with %q database", dbCfg.DBName) {
		require.NoErrorf(t, db.Close(), "release connection to the %q database", dbCfg.DBName)
	}
	return db
}

func requireTerminateAllConnections(t testing.TB, db *sql.DB, database string) {
	t.Helper()
	_, err := db.Exec("SELECT PG_TERMINATE_BACKEND(pid) FROM PG_STAT_ACTIVITY WHERE datname = '" + database + "'")
	require.NoError(t, err)

	// Once the pg_terminate_backend has completed, we may need to wait before the connections
	// are fully released. pg_terminate_backend will return true as long as the signal was
	// sent successfully, but the backend needs to respond to the signal to close the connection.
	// TODO: In Postgre 14, pg_terminate_backend takes an optional timeout argument that makes it a blocking
	// call. https://gitlab.com/gitlab-org/gitaly/-/issues/3937 tracks the refactor work to  remove this
	// require.Eventuallyf call in favor of passing in a timeout to pg_terminate_backend
	require.Eventuallyf(t, func() bool {
		var openConnections int
		require.NoError(t, db.QueryRow(
			`SELECT COUNT(*) FROM pg_stat_activity
				WHERE datname = $1 AND pid != pg_backend_pid()`, database).
			Scan(&openConnections))
		return openConnections == 0
	}, 20*time.Second, 10*time.Millisecond, "wait for all connections to be terminated")
}

func initPraefectDB(t testing.TB, database string) *sql.DB {
	t.Helper()

	dbCfg := GetConfig(t, "postgres")
	// We require a direct connection to the Postgres instance and not through the PgBouncer
	// because we use transaction pool mood for it and it doesn't work well for system advisory locks.
	postgresDB := requireSQLOpen(t, dbCfg, true)
	defer func() { require.NoErrorf(t, postgresDB.Close(), "release connection to the %q database", dbCfg.DBName) }()

	// Acquire exclusive advisory lock to prevent other concurrent test from doing the same.
	_, err := postgresDB.Exec(`SELECT pg_advisory_lock($1)`, advisoryLockIDDatabaseTemplate)
	require.NoError(t, err, "not able to acquire lock for synchronisation")
	var advisoryUnlock func()
	advisoryUnlock = func() {
		require.True(t, scanSingleBool(t, postgresDB, `SELECT pg_advisory_unlock($1)`, advisoryLockIDDatabaseTemplate), "release advisory lock")
		advisoryUnlock = func() {}
	}
	defer func() { advisoryUnlock() }()

	templateDBExists := databaseExist(t, postgresDB, praefectTemplateDatabase)
	if !templateDBExists {
		_, err := postgresDB.Exec("CREATE DATABASE " + praefectTemplateDatabase + " WITH ENCODING 'UTF8'")
		require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase)
	}

	templateDBConf := GetConfig(t, praefectTemplateDatabase)
	templateDB := requireSQLOpen(t, templateDBConf, true)
	defer func() {
		require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName)
	}()

	if _, err := glsql.Migrate(templateDB, false); err != nil {
		// If database has unknown migration we try to re-create template database with
		// current migration. It may be caused by other code changes done in another branch.
		if pErr := (*migrate.PlanError)(nil); errors.As(err, &pErr) {
			if strings.EqualFold(pErr.ErrorMessage, "unknown migration in database") {
				require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName)

				_, err = postgresDB.Exec("DROP DATABASE " + praefectTemplateDatabase)
				require.NoErrorf(t, err, "failed to drop %q database", praefectTemplateDatabase)
				_, err = postgresDB.Exec("CREATE DATABASE " + praefectTemplateDatabase + " WITH ENCODING 'UTF8'")
				require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase)

				remigrateTemplateDB := requireSQLOpen(t, templateDBConf, true)
				defer func() {
					require.NoErrorf(t, remigrateTemplateDB.Close(), "release connection to the %q database", templateDBConf.DBName)
				}()
				_, err = glsql.Migrate(remigrateTemplateDB, false)
				require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase)
			} else {
				require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase)
			}
		} else {
			require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase)
		}
	}

	// Release advisory lock as soon as possible to unblock other tests from execution.
	advisoryUnlock()

	require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName)

	_, err = postgresDB.Exec(`CREATE DATABASE ` + database + ` TEMPLATE ` + praefectTemplateDatabase)
	require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase)

	t.Cleanup(func() {
		if _, ok := getDatabaseEnvironment(t)["PGHOST_PGBOUNCER"]; ok {
			pgbouncerCfg := dbCfg
			// This database name will connect us to the special admin console.
			pgbouncerCfg.DBName = "pgbouncer"

			// We cannot use `requireSQLOpen()` because it would ping the database,
			// which is not supported by the PgBouncer admin console.
			pgbouncerDB, err := sql.Open("pgx", glsql.DSN(pgbouncerCfg, false))
			require.NoError(t, err)
			defer testhelper.MustClose(t, pgbouncerDB)

			// Trying to release connections like we do with the "normal" Postgres
			// database regularly results in flaky tests with PgBouncer given that the
			// connections are seemingly never released. Instead, we kill PgBouncer
			// connections by connecting to its admin console and using the KILL
			// command, which instructs it to kill all client and server connections.
			_, err = pgbouncerDB.Exec("KILL " + database)
			require.NoError(t, err, "killing PgBouncer connections")
		}

		dbCfg.DBName = "postgres"
		postgresDB := requireSQLOpen(t, dbCfg, true)
		defer testhelper.MustClose(t, postgresDB)

		// We need to force-terminate open connections as for the tasks that use PgBouncer
		// the actual client connected to the database is a PgBouncer and not a test that is
		// running.
		requireTerminateAllConnections(t, postgresDB, database)

		_, err = postgresDB.Exec("DROP DATABASE " + database)
		require.NoErrorf(t, err, "failed to drop %q database", database)
	})

	// Connect to the testing database with optional PgBouncer
	dbCfg.DBName = database
	praefectTestDB := requireSQLOpen(t, dbCfg, false)
	t.Cleanup(func() {
		if err := praefectTestDB.Close(); !errors.Is(err, net.ErrClosed) {
			require.NoErrorf(t, err, "release connection to the %q database", dbCfg.DBName)
		}
	})
	return praefectTestDB
}

func databaseExist(t testing.TB, db *sql.DB, database string) bool {
	return scanSingleBool(t, db, `SELECT EXISTS(SELECT * FROM pg_database WHERE datname = $1)`, database)
}

func scanSingleBool(t testing.TB, db *sql.DB, query string, args ...interface{}) bool {
	var flag bool
	row := db.QueryRow(query, args...)
	require.NoError(t, row.Scan(&flag))
	return flag
}

var (
	// Running `gdk env` takes about 250ms on my system and is thus comparatively slow. When
	// running with Praefect as proxy, this time adds up and may thus slow down tests by quite a
	// margin. We thus amortize these costs by only running it once.
	databaseEnvOnce sync.Once
	databaseEnv     map[string]string
)

func getDatabaseEnvironment(t testing.TB) map[string]string {
	databaseEnvOnce.Do(func() {
		envvars := map[string]string{}

		// We only process output if `gdk env` returned success. If it didn't, we simply assume that
		// we are not running in a GDK environment and will try to extract variables from the
		// environment instead.
		if output, err := exec.Command("gdk", "env").Output(); err == nil {
			for _, line := range strings.Split(string(output), "\n") {
				const prefix = "export "
				if !strings.HasPrefix(line, prefix) {
					continue
				}

				split := strings.SplitN(strings.TrimPrefix(line, prefix), "=", 2)
				if len(split) != 2 {
					continue
				}

				envvars[split[0]] = split[1]
			}
		}

		for _, key := range []string{"PGHOST", "PGPORT", "PGUSER", "PGHOST_PGBOUNCER", "PGPORT_PGBOUNCER"} {
			if _, ok := envvars[key]; !ok {
				value, ok := os.LookupEnv(key)
				if ok {
					envvars[key] = value
				}
			}
		}

		databaseEnv = envvars
	})

	return databaseEnv
}

// WaitForBlockedQuery is a helper that waits until a blocked query matching the prefix is present in the
// database. This is useful for ensuring another transaction is blocking a query when testing concurrent
// execution of multiple queries.
func WaitForBlockedQuery(ctx context.Context, t testing.TB, db glsql.Querier, queryPrefix string) {
	t.Helper()

	for {
		var queryBlocked bool
		require.NoError(t, db.QueryRowContext(ctx, `
			SELECT EXISTS (
				SELECT FROM pg_stat_activity
				WHERE TRIM(e'\n' FROM query) LIKE $1
				AND state = 'active'
				AND wait_event_type = 'Lock'
				AND datname = current_database()
			)
		`, queryPrefix+"%").Scan(&queryBlocked))

		if queryBlocked {
			return
		}

		retry := time.NewTimer(time.Millisecond)
		select {
		case <-ctx.Done():
			retry.Stop()
			return
		case <-retry.C:
		}
	}
}

// SetMigrations ensures the requested number of migrations are up and the remainder are down.
func SetMigrations(t testing.TB, db DB, cfg config.Config, up int) {
	// Ensure all migrations are up first
	_, err := glsql.Migrate(db.DB, true)
	require.NoError(t, err)

	migrationCt := len(migrations.All())

	if up < migrationCt {
		down := migrationCt - up

		migrationSet := migrate.MigrationSet{
			TableName: migrations.MigrationTableName,
		}
		ms := &migrate.MemoryMigrationSource{Migrations: migrations.All()}

		// It would be preferable to use migrate.MigrateDown() here, but that introduces
		// a circular dependency between testdb and datastore.
		n, err := migrationSet.ExecMax(db.DB, "postgres", ms, migrate.Down, down)
		require.NoError(t, err)
		require.Equal(t, down, n)
	}
}