Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-12-16 11:20:11 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-12-16 11:20:11 +0300
commitf5ca110dbbc1620b1fd5386e432248570b103c66 (patch)
treefcf015a5968e31099d3b47fc42c28828bfb2e352
parent152dd3c663c73f79db4c1ff0f63b9011316e35a0 (diff)
parent689073bfa187427bdbd30ad3eceacd7e6678cc6a (diff)
Merge branch 'pks-pgbouncer-kill-connections' into 'master'
testdb: Fix stuck PgBouncer connections by killing via admin command See merge request gitlab-org/gitaly!4193
-rw-r--r--cmd/praefect/main_test.go6
-rw-r--r--cmd/praefect/subcmd_accept_dataloss_test.go4
-rw-r--r--cmd/praefect/subcmd_dataloss_test.go3
-rw-r--r--cmd/praefect/subcmd_list_untracked_repositories_test.go6
-rw-r--r--cmd/praefect/subcmd_metadata_test.go3
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go10
-rw-r--r--cmd/praefect/subcmd_set_replication_factor_test.go4
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go6
-rw-r--r--internal/backup/backup_test.go6
-rw-r--r--internal/praefect/auth_test.go4
-rw-r--r--internal/praefect/checks_test.go21
-rw-r--r--internal/praefect/coordinator_pg_test.go3
-rw-r--r--internal/praefect/coordinator_test.go15
-rw-r--r--internal/praefect/datastore/assignment_test.go6
-rw-r--r--internal/praefect/datastore/collector_test.go3
-rw-r--r--internal/praefect/datastore/glsql/mock.go23
-rw-r--r--internal/praefect/datastore/glsql/postgres_test.go30
-rw-r--r--internal/praefect/datastore/glsql/testing.go381
-rw-r--r--internal/praefect/datastore/listener_postgres_test.go17
-rw-r--r--internal/praefect/datastore/postgres_test.go6
-rw-r--r--internal/praefect/datastore/queue_bm_test.go4
-rw-r--r--internal/praefect/datastore/queue_test.go32
-rw-r--r--internal/praefect/datastore/repository_store_bm_test.go4
-rw-r--r--internal/praefect/datastore/repository_store_test.go8
-rw-r--r--internal/praefect/datastore/storage_cleanup_test.go10
-rw-r--r--internal/praefect/datastore/storage_provider_test.go3
-rw-r--r--internal/praefect/helper_test.go4
-rw-r--r--internal/praefect/info_service_test.go3
-rw-r--r--internal/praefect/nodes/health_manager_test.go13
-rw-r--r--internal/praefect/nodes/per_repository_test.go5
-rw-r--r--internal/praefect/nodes/sql_elector_test.go15
-rw-r--r--internal/praefect/reconciler/reconciler_benchmark_test.go4
-rw-r--r--internal/praefect/reconciler/reconciler_test.go6
-rw-r--r--internal/praefect/remove_repository_test.go4
-rw-r--r--internal/praefect/replicator_pg_test.go6
-rw-r--r--internal/praefect/replicator_test.go22
-rw-r--r--internal/praefect/repocleaner/repository_test.go10
-rw-r--r--internal/praefect/repository_exists_test.go4
-rw-r--r--internal/praefect/router_per_repository_test.go7
-rw-r--r--internal/praefect/server_factory_test.go4
-rw-r--r--internal/praefect/server_test.go11
-rw-r--r--internal/testhelper/testdb/db.go424
-rw-r--r--internal/testhelper/testdb/db_test.go (renamed from internal/praefect/datastore/glsql/testing_test.go)4
-rw-r--r--internal/testhelper/testdb/health.go51
-rw-r--r--internal/testhelper/testserver/gitaly.go6
45 files changed, 610 insertions, 611 deletions
diff --git a/cmd/praefect/main_test.go b/cmd/praefect/main_test.go
index 25a615876..9f789bf24 100644
--- a/cmd/praefect/main_test.go
+++ b/cmd/praefect/main_test.go
@@ -13,8 +13,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func TestMain(m *testing.M) {
@@ -190,8 +190,8 @@ func (m *mockRegisterer) Gather() ([]*dto.MetricFamily, error) {
func TestExcludeDatabaseMetricsFromDefaultMetrics(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
- dbConf := glsql.GetDBConfig(t, db.Name)
+ db := testdb.New(t)
+ dbConf := testdb.GetConfig(t, db.Name)
conf := config.Config{
SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go
index ef80069b7..8ebc3927a 100644
--- a/cmd/praefect/subcmd_accept_dataloss_test.go
+++ b/cmd/praefect/subcmd_accept_dataloss_test.go
@@ -7,9 +7,9 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
@@ -35,7 +35,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
startingGenerations := map[string]int{st1: 1, st2: 0, st3: datastore.GenerationUnknown}
diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go
index b4fe18eb6..b46bef8dd 100644
--- a/cmd/praefect/subcmd_dataloss_test.go
+++ b/cmd/praefect/subcmd_dataloss_test.go
@@ -7,7 +7,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
@@ -42,7 +41,7 @@ func TestDatalossSubcommand(t *testing.T) {
},
}
- tx := glsql.NewDB(t).Begin(t)
+ tx := testdb.New(t).Begin(t)
defer tx.Rollback(t)
ctx, cancel := testhelper.Context()
diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go
index 527a3219c..c54ecd9ea 100644
--- a/cmd/praefect/subcmd_list_untracked_repositories_test.go
+++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go
@@ -17,9 +17,9 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
@@ -64,10 +64,10 @@ func TestListUntrackedRepositories_Exec(t *testing.T) {
g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
- db := glsql.NewDB(t)
+ db := testdb.New(t)
var database string
require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
- dbConf := glsql.GetDBConfig(t, database)
+ dbConf := testdb.GetConfig(t, database)
conf := config.Config{
SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
diff --git a/cmd/praefect/subcmd_metadata_test.go b/cmd/praefect/subcmd_metadata_test.go
index dc5cd2b60..3432457cf 100644
--- a/cmd/praefect/subcmd_metadata_test.go
+++ b/cmd/praefect/subcmd_metadata_test.go
@@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
@@ -23,7 +22,7 @@ func TestMetadataSubcommand(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- tx := glsql.NewDB(t).Begin(t)
+ tx := testdb.New(t).Begin(t)
defer tx.Rollback(t)
testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index bf2ece389..a1a4f6cbc 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -20,9 +20,9 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
@@ -75,8 +75,8 @@ func TestRemoveRepository_Exec(t *testing.T) {
g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
- db := glsql.NewDB(t)
- dbConf := glsql.GetDBConfig(t, db.Name)
+ db := testdb.New(t)
+ dbConf := testdb.GetConfig(t, db.Name)
conf := config.Config{
SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
@@ -233,7 +233,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
<-stopped
}
-func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) {
+func requireNoDatabaseInfo(t *testing.T, db testdb.DB, cmd *removeRepository) {
t.Helper()
var repositoryRowExists bool
require.NoError(t, db.QueryRow(
@@ -259,7 +259,7 @@ func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
queue := datastore.NewPostgresReplicationEventQueue(db)
diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go
index 6e0d0e3e0..03c469844 100644
--- a/cmd/praefect/subcmd_set_replication_factor_test.go
+++ b/cmd/praefect/subcmd_set_replication_factor_test.go
@@ -8,16 +8,16 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestSetReplicationFactorSubcommand(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
index ed55c1ce7..e1e08a3c3 100644
--- a/cmd/praefect/subcmd_track_repository_test.go
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -15,12 +15,12 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
@@ -86,8 +86,8 @@ func TestAddRepository_Exec(t *testing.T) {
g1Addr := g1Srv.Address()
- db := glsql.NewDB(t)
- dbConf := glsql.GetDBConfig(t, db.Name)
+ db := testdb.New(t)
+ dbConf := testdb.GetConfig(t, db.Name)
virtualStorageName := "praefect"
conf := config.Config{
diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go
index 9031cf83e..65abe05f8 100644
--- a/internal/backup/backup_test.go
+++ b/internal/backup/backup_test.go
@@ -20,9 +20,9 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
praefectConfig "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/protobuf/proto"
@@ -285,8 +285,8 @@ func TestManager_Restore_praefect(t *testing.T) {
gitalyAddr := testserver.RunGitalyServer(t, gitalyCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
- db := glsql.NewDB(t)
- dbConf := glsql.GetDBConfig(t, db.Name)
+ db := testdb.New(t)
+ dbConf := testdb.GetConfig(t, db.Name)
conf := praefectConfig.Config{
SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index af610e240..4dfd6ad8a 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -11,13 +11,13 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/auth"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -149,7 +149,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
},
}
logEntry := testhelper.NewDiscardingLogEntry(t)
- queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
diff --git a/internal/praefect/checks_test.go b/internal/praefect/checks_test.go
index 5b1481ba4..c259db08d 100644
--- a/internal/praefect/checks_test.go
+++ b/internal/praefect/checks_test.go
@@ -16,7 +16,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/migrations"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
@@ -72,8 +71,8 @@ func TestPraefectMigrations_success(t *testing.T) {
defer cancel()
var cfg config.Config
- db := glsql.NewDB(t)
- cfg.DB = glsql.GetDBConfig(t, db.Name)
+ db := testdb.New(t)
+ cfg.DB = testdb.GetConfig(t, db.Name)
require.NoError(t, tc.prepare(cfg))
@@ -358,20 +357,20 @@ func runNodes(t *testing.T, nodes []nodeAssertion) ([]*config.Node, func()) {
func TestPostgresReadWriteCheck(t *testing.T) {
testCases := []struct {
desc string
- setup func(t *testing.T, db glsql.DB) config.DB
+ setup func(t *testing.T, db testdb.DB) config.DB
expectedErr string
expectedLog string
}{
{
desc: "read and write work",
- setup: func(t *testing.T, db glsql.DB) config.DB {
- return glsql.GetDBConfig(t, db.Name)
+ setup: func(t *testing.T, db testdb.DB) config.DB {
+ return testdb.GetConfig(t, db.Name)
},
expectedLog: "successfully read from database\nsuccessfully wrote to database\n",
},
{
desc: "read only",
- setup: func(t *testing.T, db glsql.DB) config.DB {
+ setup: func(t *testing.T, db testdb.DB) config.DB {
role := "praefect_ro_role_" + strings.ReplaceAll(uuid.New().String(), "-", "")
_, err := db.Exec(fmt.Sprintf(`
@@ -386,7 +385,7 @@ func TestPostgresReadWriteCheck(t *testing.T) {
require.NoError(t, err)
})
- dbCfg := glsql.GetDBConfig(t, db.Name)
+ dbCfg := testdb.GetConfig(t, db.Name)
dbCfg.User = role
dbCfg.Password = ""
@@ -402,7 +401,7 @@ func TestPostgresReadWriteCheck(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
t.Cleanup(func() { require.NoError(t, db.Close()) })
dbConf := tc.setup(t, db)
@@ -475,8 +474,8 @@ func TestNewUnavailableReposCheck(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.NewDB(t)
- dbCfg := glsql.GetDBConfig(t, db.Name)
+ db := testdb.New(t)
+ dbCfg := testdb.GetConfig(t, db.Name)
conf.DB = dbCfg
rs := datastore.NewPostgresRepositoryStore(db, nil)
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index 3eb70b8d0..ee7a5e6ab 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -12,7 +12,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
@@ -154,7 +153,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
},
}
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range testcases {
t.Run(tc.desc, func(t *testing.T) {
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 70dc062aa..31b913c60 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -28,7 +28,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
@@ -65,7 +64,7 @@ func TestSecondaryRotation(t *testing.T) {
func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
readOnly bool
@@ -157,7 +156,7 @@ func TestStreamDirectorMutator(t *testing.T) {
},
},
}
- db := glsql.NewDB(t)
+ db := testdb.New(t)
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -334,7 +333,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
txMgr := transactions.NewManager(conf)
coordinator := NewCoordinator(
- datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)),
+ datastore.NewPostgresReplicationEventQueue(testdb.New(t)),
rs,
NewNodeManagerRouter(nodeMgr, rs),
txMgr,
@@ -427,7 +426,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
},
}
- queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -534,7 +533,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
},
}
- queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -821,7 +820,7 @@ func TestRewrittenRepositoryMessage(t *testing.T) {
func TestStreamDirector_repo_creation(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
@@ -1097,7 +1096,7 @@ func TestAbsentCorrelationID(t *testing.T) {
},
}
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created")
return queue.Enqueue(ctx, event)
diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go
index faa5691da..3de909a37 100644
--- a/internal/praefect/datastore/assignment_test.go
+++ b/internal/praefect/datastore/assignment_test.go
@@ -8,8 +8,8 @@ import (
"github.com/lib/pq"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func TestAssignmentStore_GetHostAssignments(t *testing.T) {
@@ -20,7 +20,7 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) {
storage string
}
- db := glsql.NewDB(t)
+ db := testdb.New(t)
configuredStorages := []string{"storage-1", "storage-2", "storage-3"}
for _, tc := range []struct {
@@ -131,7 +131,7 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) {
}
}
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go
index 861fe3998..7afb09cd0 100644
--- a/internal/praefect/datastore/collector_test.go
+++ b/internal/praefect/datastore/collector_test.go
@@ -15,7 +15,6 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
@@ -36,7 +35,7 @@ func TestRepositoryStoreCollector(t *testing.T) {
replicas replicas
}
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
diff --git a/internal/praefect/datastore/glsql/mock.go b/internal/praefect/datastore/glsql/mock.go
deleted file mode 100644
index ccd1ddb85..000000000
--- a/internal/praefect/datastore/glsql/mock.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package glsql
-
-import (
- "context"
- "database/sql"
-)
-
-// MockQuerier allows for mocking database operations out.
-type MockQuerier struct {
- ExecContextFunc func(context.Context, string, ...interface{}) (sql.Result, error)
- QueryContextFunc func(context.Context, string, ...interface{}) (*sql.Rows, error)
- Querier
-}
-
-// ExecContext runs the mock's ExecContextFunc.
-func (m MockQuerier) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
- return m.ExecContextFunc(ctx, query, args...)
-}
-
-// QueryContext runs the mock's QueryContextFunc.
-func (m MockQuerier) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
- return m.QueryContextFunc(ctx, query, args...)
-}
diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go
index 590627b21..c5f551695 100644
--- a/internal/praefect/datastore/glsql/postgres_test.go
+++ b/internal/praefect/datastore/glsql/postgres_test.go
@@ -1,4 +1,4 @@
-package glsql
+package glsql_test
import (
"net"
@@ -7,18 +7,20 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func TestOpenDB(t *testing.T) {
- dbCfg := GetDBConfig(t, "postgres")
+ dbCfg := testdb.GetConfig(t, "postgres")
ctx, cancel := testhelper.Context()
defer cancel()
t.Run("failed to ping because of incorrect config", func(t *testing.T) {
badCfg := dbCfg
badCfg.Host = "not-existing.com"
- _, err := OpenDB(ctx, badCfg)
+ _, err := glsql.OpenDB(ctx, badCfg)
require.Error(t, err)
// Locally the error looks like:
// send ping: dial tcp: lookup not-existing.com: no such host
@@ -39,21 +41,21 @@ func TestOpenDB(t *testing.T) {
ctx, cancel := testhelper.Context()
cancel()
- _, err = OpenDB(ctx, badCfg)
+ _, err = glsql.OpenDB(ctx, badCfg)
require.EqualError(t, err, "context canceled")
duration := time.Since(start)
require.Truef(t, duration < time.Second, "connection attempt took %s", duration.String())
})
t.Run("connected with proper config", func(t *testing.T) {
- db, err := OpenDB(ctx, dbCfg)
+ db, err := glsql.OpenDB(ctx, dbCfg)
require.NoError(t, err, "opening of DB with correct configuration must not fail")
require.NoError(t, db.Close())
})
}
func TestUint64Provider(t *testing.T) {
- var provider Uint64Provider
+ var provider glsql.Uint64Provider
dst1 := provider.To()
require.Equal(t, []interface{}{new(uint64)}, dst1, "must be a single value holder")
@@ -76,21 +78,25 @@ func TestUint64Provider(t *testing.T) {
func TestScanAll(t *testing.T) {
t.Parallel()
- db := NewDB(t)
+ db := testdb.New(t)
- var ids Uint64Provider
+ var ids glsql.Uint64Provider
notEmptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id)")
require.NoError(t, err)
+ defer func() { require.NoError(t, notEmptyRows.Close()) }()
- require.NoError(t, ScanAll(notEmptyRows, &ids))
+ require.NoError(t, glsql.ScanAll(notEmptyRows, &ids))
require.Equal(t, []uint64{1, 200, 300500}, ids.Values())
+ require.NoError(t, notEmptyRows.Err())
- var nothing Uint64Provider
+ var nothing glsql.Uint64Provider
emptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id) WHERE id < 0")
require.NoError(t, err)
+ defer func() { require.NoError(t, emptyRows.Close()) }()
- require.NoError(t, ScanAll(emptyRows, &nothing))
+ require.NoError(t, glsql.ScanAll(emptyRows, &nothing))
require.Equal(t, ([]uint64)(nil), nothing.Values())
+ require.NoError(t, emptyRows.Err())
}
func TestDSN(t *testing.T) {
@@ -209,7 +215,7 @@ func TestDSN(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- require.Equal(t, tc.out, DSN(tc.in, tc.direct))
+ require.Equal(t, tc.out, glsql.DSN(tc.in, tc.direct))
})
}
}
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
deleted file mode 100644
index bced998d2..000000000
--- a/internal/praefect/datastore/glsql/testing.go
+++ /dev/null
@@ -1,381 +0,0 @@
-package glsql
-
-import (
- "context"
- "database/sql"
- "errors"
- "net"
- "os"
- "os/exec"
- "strconv"
- "strings"
- "sync"
- "testing"
- "time"
-
- "github.com/google/uuid"
- migrate "github.com/rubenv/sql-migrate"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
-)
-
-const (
- advisoryLockIDDatabaseTemplate = 1627644550
- praefectTemplateDatabase = "praefect_template"
-)
-
-// TxWrapper is a simple wrapper around *sql.Tx.
-type TxWrapper struct {
- *sql.Tx
-}
-
-// Rollback executes Rollback operation on the wrapped *sql.Tx if it is set.
-// After execution is sets Tx to nil to prevent errors on the repeated invocations (useful
-// for testing when Rollback is deferred).
-func (txw *TxWrapper) Rollback(t testing.TB) {
- t.Helper()
- if txw.Tx != nil {
- require.NoError(t, txw.Tx.Rollback())
- txw.Tx = nil
- }
-}
-
-// Commit executes Commit operation on the wrapped *sql.Tx if it is set.
-// After execution is sets Tx to nil to prevent errors on the deferred invocations (useful
-// for testing when Rollback is deferred).
-func (txw *TxWrapper) Commit(t testing.TB) {
- t.Helper()
- if txw.Tx != nil {
- require.NoError(t, txw.Tx.Commit())
- txw.Tx = nil
- }
-}
-
-// DB is a helper struct that should be used only for testing purposes.
-type DB struct {
- *sql.DB
- // Name is a name of the database.
- Name string
-}
-
-// Begin starts a new transaction and returns it wrapped into TxWrapper.
-func (db DB) Begin(t testing.TB) *TxWrapper {
- t.Helper()
- tx, err := db.DB.Begin()
- require.NoError(t, err)
- return &TxWrapper{Tx: tx}
-}
-
-// Truncate removes all data from the list of tables and restarts identities for them.
-func (db DB) Truncate(t testing.TB, tables ...string) {
- t.Helper()
-
- for _, table := range tables {
- _, err := db.DB.Exec("DELETE FROM " + table)
- require.NoError(t, err, "database cleanup failed: %s", tables)
- }
-
- _, err := db.DB.Exec("SELECT setval(relname::TEXT, 1, false) from pg_class where relkind = 'S'")
- require.NoError(t, err, "database cleanup failed: %s", tables)
-}
-
-// RequireRowsInTable verifies that `tname` table has `n` amount of rows in it.
-func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) {
- t.Helper()
-
- var count int
- require.NoError(t, db.QueryRow("SELECT COUNT(*) FROM "+tname).Scan(&count))
- require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n)
-}
-
-// TruncateAll removes all data from known set of tables.
-func (db DB) TruncateAll(t testing.TB) {
- db.Truncate(t,
- "replication_queue_job_lock",
- "replication_queue",
- "replication_queue_lock",
- "node_status",
- "shard_primaries",
- "storage_repositories",
- "repositories",
- "virtual_storages",
- "repository_assignments",
- "storage_cleanups",
- )
-}
-
-// MustExec executes `q` with `args` and verifies there are no errors.
-func (db DB) MustExec(t testing.TB, q string, args ...interface{}) {
- _, err := db.DB.Exec(q, args...)
- require.NoError(t, err)
-}
-
-// Close removes schema if it was used and releases connection pool.
-func (db DB) Close() error {
- if err := db.DB.Close(); err != nil {
- return errors.New("failed to release connection pool: " + err.Error())
- }
- return nil
-}
-
-// NewDB returns a wrapper around the database connection pool.
-// Must be used only for testing.
-// The new database with empty relations will be created for each call of this function.
-// It uses env vars:
-// PGHOST - required, URL/socket/dir
-// PGPORT - required, binding port
-// PGUSER - optional, user - `$ whoami` would be used if not provided
-// Once the test is completed the database will be dropped on test cleanup execution.
-func NewDB(t testing.TB) DB {
- t.Helper()
- database := "praefect_" + strings.ReplaceAll(uuid.New().String(), "-", "")
- return DB{DB: initPraefectTestDB(t, database), Name: database}
-}
-
-// GetDBConfig returns the database configuration determined by
-// environment variables. See NewDB() for the list of variables.
-func GetDBConfig(t testing.TB, database string) config.DB {
- env := getDatabaseEnvironment(t)
-
- require.Contains(t, env, "PGHOST", "PGHOST env var expected to be provided to connect to Postgres database")
- require.Contains(t, env, "PGPORT", "PGHOST env var expected to be provided to connect to Postgres database")
-
- portNumber, err := strconv.Atoi(env["PGPORT"])
- require.NoError(t, err, "PGPORT must be a port number of the Postgres database listens for incoming connections")
-
- // connect to 'postgres' database first to re-create testing database from scratch
- conf := config.DB{
- Host: env["PGHOST"],
- Port: portNumber,
- DBName: database,
- SSLMode: "disable",
- User: env["PGUSER"],
- SessionPooled: config.DBConnection{
- Host: env["PGHOST"],
- Port: portNumber,
- },
- }
-
- if bouncerHost, ok := env["PGHOST_PGBOUNCER"]; ok {
- conf.Host = bouncerHost
- }
-
- if bouncerPort, ok := env["PGPORT_PGBOUNCER"]; ok {
- bouncerPortNumber, err := strconv.Atoi(bouncerPort)
- require.NoError(t, err, "PGPORT_PGBOUNCER must be a port number of the PgBouncer")
- conf.Port = bouncerPortNumber
- }
-
- return conf
-}
-
-func requireSQLOpen(t testing.TB, dbCfg config.DB, direct bool) *sql.DB {
- t.Helper()
- db, err := sql.Open("postgres", DSN(dbCfg, direct))
- require.NoErrorf(t, err, "failed to connect to %q database", dbCfg.DBName)
- if !assert.NoErrorf(t, db.Ping(), "failed to communicate with %q database", dbCfg.DBName) {
- require.NoErrorf(t, db.Close(), "release connection to the %q database", dbCfg.DBName)
- }
- return db
-}
-
-func requireTerminateAllConnections(t testing.TB, db *sql.DB, database string) {
- t.Helper()
- _, err := db.Exec("SELECT PG_TERMINATE_BACKEND(pid) FROM PG_STAT_ACTIVITY WHERE datname = '" + database + "'")
- require.NoError(t, err)
-
- // Once the pg_terminate_backend has completed, we may need to wait before the connections
- // are fully released. pg_terminate_backend will return true as long as the signal was
- // sent successfully, but the backend needs to respond to the signal to close the connection.
- // TODO: In Postgre 14, pg_terminate_backend takes an optional timeout argument that makes it a blocking
- // call. https://gitlab.com/gitlab-org/gitaly/-/issues/3937 tracks the refactor work to remove this
- // require.Eventuallyf call in favor of passing in a timeout to pg_terminate_backend
- require.Eventuallyf(t, func() bool {
- var openConnections int
- require.NoError(t, db.QueryRow(
- `SELECT COUNT(*) FROM pg_stat_activity
- WHERE datname = $1 AND pid != pg_backend_pid()`, database).
- Scan(&openConnections))
- return openConnections == 0
- }, 20*time.Second, 10*time.Millisecond, "wait for all connections to be terminated")
-}
-
-func initPraefectTestDB(t testing.TB, database string) *sql.DB {
- t.Helper()
-
- dbCfg := GetDBConfig(t, "postgres")
- // We require a direct connection to the Postgres instance and not through the PgBouncer
- // because we use transaction pool mood for it and it doesn't work well for system advisory locks.
- postgresDB := requireSQLOpen(t, dbCfg, true)
- defer func() { require.NoErrorf(t, postgresDB.Close(), "release connection to the %q database", dbCfg.DBName) }()
-
- // Acquire exclusive advisory lock to prevent other concurrent test from doing the same.
- _, err := postgresDB.Exec(`SELECT pg_advisory_lock($1)`, advisoryLockIDDatabaseTemplate)
- require.NoError(t, err, "not able to acquire lock for synchronisation")
- var advisoryUnlock func()
- advisoryUnlock = func() {
- require.True(t, scanSingleBool(t, postgresDB, `SELECT pg_advisory_unlock($1)`, advisoryLockIDDatabaseTemplate), "release advisory lock")
- advisoryUnlock = func() {}
- }
- defer func() { advisoryUnlock() }()
-
- templateDBExists := databaseExist(t, postgresDB, praefectTemplateDatabase)
- if !templateDBExists {
- _, err := postgresDB.Exec("CREATE DATABASE " + praefectTemplateDatabase + " WITH ENCODING 'UTF8'")
- require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase)
- }
-
- templateDBConf := GetDBConfig(t, praefectTemplateDatabase)
- templateDB := requireSQLOpen(t, templateDBConf, true)
- defer func() {
- require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName)
- }()
-
- if _, err := Migrate(templateDB, false); err != nil {
- // If database has unknown migration we try to re-create template database with
- // current migration. It may be caused by other code changes done in another branch.
- if pErr := (*migrate.PlanError)(nil); errors.As(err, &pErr) {
- if strings.EqualFold(pErr.ErrorMessage, "unknown migration in database") {
- require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName)
-
- _, err = postgresDB.Exec("DROP DATABASE " + praefectTemplateDatabase)
- require.NoErrorf(t, err, "failed to drop %q database", praefectTemplateDatabase)
- _, err = postgresDB.Exec("CREATE DATABASE " + praefectTemplateDatabase + " WITH ENCODING 'UTF8'")
- require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase)
-
- remigrateTemplateDB := requireSQLOpen(t, templateDBConf, true)
- defer func() {
- require.NoErrorf(t, remigrateTemplateDB.Close(), "release connection to the %q database", templateDBConf.DBName)
- }()
- _, err = Migrate(remigrateTemplateDB, false)
- require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase)
- } else {
- require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase)
- }
- } else {
- require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase)
- }
- }
-
- // Release advisory lock as soon as possible to unblock other tests from execution.
- advisoryUnlock()
-
- require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName)
-
- _, err = postgresDB.Exec(`CREATE DATABASE ` + database + ` TEMPLATE ` + praefectTemplateDatabase)
- require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase)
-
- t.Cleanup(func() {
- dbCfg.DBName = "postgres"
- postgresDB := requireSQLOpen(t, dbCfg, true)
- defer func() { require.NoErrorf(t, postgresDB.Close(), "release connection to the %q database", dbCfg.DBName) }()
-
- // We need to force-terminate open connections as for the tasks that use PgBouncer
- // the actual client connected to the database is a PgBouncer and not a test that is
- // running.
- requireTerminateAllConnections(t, postgresDB, database)
-
- _, err = postgresDB.Exec("DROP DATABASE " + database)
- require.NoErrorf(t, err, "failed to drop %q database", database)
- })
-
- // Connect to the testing database with optional PgBouncer
- dbCfg.DBName = database
- praefectTestDB := requireSQLOpen(t, dbCfg, false)
- t.Cleanup(func() {
- if err := praefectTestDB.Close(); !errors.Is(err, net.ErrClosed) {
- require.NoErrorf(t, err, "release connection to the %q database", dbCfg.DBName)
- }
- })
- return praefectTestDB
-}
-
-func databaseExist(t testing.TB, db *sql.DB, database string) bool {
- return scanSingleBool(t, db, `SELECT EXISTS(SELECT * FROM pg_database WHERE datname = $1)`, database)
-}
-
-func scanSingleBool(t testing.TB, db *sql.DB, query string, args ...interface{}) bool {
- var flag bool
- row := db.QueryRow(query, args...)
- require.NoError(t, row.Scan(&flag))
- return flag
-}
-
-var (
- // Running `gdk env` takes about 250ms on my system and is thus comparatively slow. When
- // running with Praefect as proxy, this time adds up and may thus slow down tests by quite a
- // margin. We thus amortize these costs by only running it once.
- databaseEnvOnce sync.Once
- databaseEnv map[string]string
-)
-
-func getDatabaseEnvironment(t testing.TB) map[string]string {
- databaseEnvOnce.Do(func() {
- envvars := map[string]string{}
-
- // We only process output if `gdk env` returned success. If it didn't, we simply assume that
- // we are not running in a GDK environment and will try to extract variables from the
- // environment instead.
- if output, err := exec.Command("gdk", "env").Output(); err == nil {
- for _, line := range strings.Split(string(output), "\n") {
- const prefix = "export "
- if !strings.HasPrefix(line, prefix) {
- continue
- }
-
- split := strings.SplitN(strings.TrimPrefix(line, prefix), "=", 2)
- if len(split) != 2 {
- continue
- }
-
- envvars[split[0]] = split[1]
- }
- }
-
- for _, key := range []string{"PGHOST", "PGPORT", "PGUSER", "PGHOST_PGBOUNCER", "PGPORT_PGBOUNCER"} {
- if _, ok := envvars[key]; !ok {
- value, ok := os.LookupEnv(key)
- if ok {
- envvars[key] = value
- }
- }
- }
-
- databaseEnv = envvars
- })
-
- return databaseEnv
-}
-
-// WaitForBlockedQuery is a helper that waits until a blocked query matching the prefix is present in the
-// database. This is useful for ensuring another transaction is blocking a query when testing concurrent
-// execution of multiple queries.
-func WaitForBlockedQuery(ctx context.Context, t testing.TB, db Querier, queryPrefix string) {
- t.Helper()
-
- for {
- var queryBlocked bool
- require.NoError(t, db.QueryRowContext(ctx, `
- SELECT EXISTS (
- SELECT FROM pg_stat_activity
- WHERE TRIM(e'\n' FROM query) LIKE $1
- AND state = 'active'
- AND wait_event_type = 'Lock'
- AND datname = current_database()
- )
- `, queryPrefix+"%").Scan(&queryBlocked))
-
- if queryBlocked {
- return
- }
-
- retry := time.NewTimer(time.Millisecond)
- select {
- case <-ctx.Done():
- retry.Stop()
- return
- case <-retry.C:
- }
- }
-}
diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go
index 96bed5f9f..26013ebd6 100644
--- a/internal/praefect/datastore/listener_postgres_test.go
+++ b/internal/praefect/datastore/listener_postgres_test.go
@@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func TestNewPostgresListener(t *testing.T) {
@@ -85,13 +86,13 @@ func (mlh mockListenHandler) Connected() {
func TestPostgresListener_Listen(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
logger := testhelper.NewDiscardingLogger(t)
newOpts := func() PostgresListenerOpts {
opts := DefaultPostgresListenerOpts
- opts.Addr = glsql.DSN(glsql.GetDBConfig(t, db.Name), true)
+ opts.Addr = glsql.DSN(testdb.GetConfig(t, db.Name), true)
opts.MinReconnectInterval = time.Nanosecond
opts.MaxReconnectInterval = time.Minute
return opts
@@ -367,7 +368,7 @@ func requireEqualNotificationEntries(t *testing.T, d string, entries []notificat
func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
const channel = "repositories_updates"
@@ -400,7 +401,7 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
const channel = "storage_repositories_updates"
@@ -432,7 +433,7 @@ func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) {
func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
const channel = "storage_repositories_updates"
@@ -460,7 +461,7 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) {
func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
const channel = "storage_repositories_updates"
@@ -479,7 +480,7 @@ func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) {
func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
const channel = "storage_repositories_updates"
@@ -523,7 +524,7 @@ func testListener(t *testing.T, dbName, channel string, setup func(t *testing.T)
}
opts := DefaultPostgresListenerOpts
- opts.Addr = glsql.DSN(glsql.GetDBConfig(t, dbName), true)
+ opts.Addr = glsql.DSN(testdb.GetConfig(t, dbName), true)
opts.Channels = []string{channel}
handler := mockListenHandler{OnNotification: callback, OnConnected: func() { close(readyChan) }}
diff --git a/internal/praefect/datastore/postgres_test.go b/internal/praefect/datastore/postgres_test.go
index 9ae3a78be..f1a8f7237 100644
--- a/internal/praefect/datastore/postgres_test.go
+++ b/internal/praefect/datastore/postgres_test.go
@@ -5,15 +5,15 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func TestMigrateStatus(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
config := config.Config{
- DB: glsql.GetDBConfig(t, db.Name),
+ DB: testdb.GetConfig(t, db.Name),
}
_, err := db.Exec("INSERT INTO schema_migrations VALUES ('2020_01_01_test', NOW())")
diff --git a/internal/praefect/datastore/queue_bm_test.go b/internal/praefect/datastore/queue_bm_test.go
index 8fea25258..fd44b8167 100644
--- a/internal/praefect/datastore/queue_bm_test.go
+++ b/internal/praefect/datastore/queue_bm_test.go
@@ -4,8 +4,8 @@ import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func BenchmarkPostgresReplicationEventQueue_Acknowledge(b *testing.B) {
@@ -31,7 +31,7 @@ func BenchmarkPostgresReplicationEventQueue_Acknowledge(b *testing.B) {
}
func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[JobState]int) {
- db := glsql.NewDB(b)
+ db := testdb.New(b)
ctx, cancel := testhelper.Context()
defer cancel()
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index aeeff4475..64a73c71f 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -8,13 +8,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
existingJob *ReplicationEvent
@@ -142,7 +142,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) {
func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -189,7 +189,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.T) {
t.Parallel()
- queue := NewPostgresReplicationEventQueue(glsql.NewDB(t))
+ queue := NewPostgresReplicationEventQueue(testdb.New(t))
ctx, cancel := testhelper.Context()
defer cancel()
@@ -240,7 +240,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.
func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -374,7 +374,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -423,7 +423,7 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
// expected results are listed as literals on purpose to be more explicit about what is going on with data
func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -533,7 +533,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -596,7 +596,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -640,7 +640,7 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -829,7 +829,7 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
eventType4 := eventType1
eventType4.Job.TargetNodeStorage = "s-2"
- db := glsql.NewDB(t)
+ db := testdb.New(t)
t.Run("no events is valid", func(t *testing.T) {
// 'qc' is not initialized, so the test will fail if there will be an attempt to make SQL operation
@@ -976,7 +976,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
eventType4 := eventType3
eventType4.Job.TargetNodeStorage = "gitaly-3"
- db := glsql.NewDB(t)
+ db := testdb.New(t)
t.Run("no stale jobs yet", func(t *testing.T) {
db.TruncateAll(t)
@@ -1065,7 +1065,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
})
}
-func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) {
+func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []ReplicationEvent) {
t.Helper()
// as it is not possible to expect exact time of entity creation/update we do not fetch it from database
@@ -1093,7 +1093,7 @@ type LockRow struct {
Acquired bool
}
-func requireLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []LockRow) {
+func requireLocks(t *testing.T, ctx context.Context, db testdb.DB, expected []LockRow) {
t.Helper()
sqlStmt := `SELECT id, acquired FROM replication_queue_lock`
@@ -1118,7 +1118,7 @@ type JobLockRow struct {
TriggeredAt time.Time
}
-func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []JobLockRow) {
+func requireJobLocks(t *testing.T, ctx context.Context, db testdb.DB, expected []JobLockRow) {
t.Helper()
actual := fetchJobLocks(t, ctx, db)
@@ -1128,7 +1128,7 @@ func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []
require.ElementsMatch(t, expected, actual)
}
-func fetchJobLocks(t *testing.T, ctx context.Context, db glsql.DB) []JobLockRow {
+func fetchJobLocks(t *testing.T, ctx context.Context, db testdb.DB) []JobLockRow {
t.Helper()
sqlStmt := `SELECT job_id, lock_id, triggered_at FROM replication_queue_job_lock ORDER BY job_id`
rows, err := db.QueryContext(ctx, sqlStmt)
diff --git a/internal/praefect/datastore/repository_store_bm_test.go b/internal/praefect/datastore/repository_store_bm_test.go
index 8555c971b..b0ffbbfad 100644
--- a/internal/praefect/datastore/repository_store_bm_test.go
+++ b/internal/praefect/datastore/repository_store_bm_test.go
@@ -5,8 +5,8 @@ import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
// The test setup takes a lot of time, so it is better to run each sub-benchmark separately with limit on number of repeats.
@@ -38,7 +38,7 @@ func BenchmarkPostgresRepositoryStore_GetConsistentStorages(b *testing.B) {
}
func benchmarkGetConsistentStorages(b *testing.B, nstorages, nrepositories int) {
- db := glsql.NewDB(b)
+ db := testdb.New(b)
ctx, cancel := testhelper.Context()
defer cancel()
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index 291e56a26..7692a4779 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -116,7 +116,7 @@ FROM storage_repositories
}
func TestRepositoryStore_Postgres(t *testing.T) {
- db := glsql.NewDB(t)
+ db := testdb.New(t)
testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireStateFunc) {
db.TruncateAll(t)
gs := NewPostgresRepositoryStore(db, storages)
@@ -129,7 +129,7 @@ func TestRepositoryStore_Postgres(t *testing.T) {
}
func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) {
- db := glsql.NewDB(t)
+ db := testdb.New(t)
type call struct {
primary string
@@ -214,7 +214,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) {
require.NoError(t, err)
go func() {
- glsql.WaitForBlockedQuery(ctx, t, db, "WITH updated_replicas AS (")
+ testdb.WaitForBlockedQuery(ctx, t, db, "WITH updated_replicas AS (")
firstTx.Commit(t)
}()
@@ -1191,7 +1191,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
func TestPostgresRepositoryStore_GetRepositoryMetadata(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
nonExistentRepository bool
diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go
index 419115119..77ee0dd22 100644
--- a/internal/praefect/datastore/storage_cleanup_test.go
+++ b/internal/praefect/datastore/storage_cleanup_test.go
@@ -6,15 +6,15 @@ import (
"time"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func TestStorageCleanup_Populate(t *testing.T) {
t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
storageCleanup := NewStorageCleanup(db.DB)
require.NoError(t, storageCleanup.Populate(ctx, "praefect", "gitaly-1"))
@@ -37,7 +37,7 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) {
t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
storageCleanup := NewStorageCleanup(db.DB)
t.Run("ok", func(t *testing.T) {
@@ -179,7 +179,7 @@ func TestStorageCleanup_Exists(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
repoStore := NewPostgresRepositoryStore(db.DB, nil)
require.NoError(t, repoStore.CreateRepository(ctx, 0, "vs", "p/1", "replica-path-1", "g1", []string{"g2", "g3"}, nil, false, false))
@@ -250,7 +250,7 @@ type storageCleanupRow struct {
TriggeredAt sql.NullTime
}
-func getAllStoragesCleanup(t testing.TB, db glsql.DB) []storageCleanupRow {
+func getAllStoragesCleanup(t testing.TB, db testdb.DB) []storageCleanupRow {
rows, err := db.Query(`SELECT * FROM storage_cleanups`)
require.NoError(t, err)
defer func() {
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
index c3c6f2e98..8c0fd2fd9 100644
--- a/internal/praefect/datastore/storage_provider_test.go
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -16,12 +16,13 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
rs := NewPostgresRepositoryStore(db, nil)
t.Run("unknown virtual storage", func(t *testing.T) {
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 8b267a8e0..d9eed5a81 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -15,7 +15,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/log"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
@@ -23,6 +22,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
@@ -119,7 +119,7 @@ func withMockBackends(t testing.TB, backends map[string]mock.SimpleServiceServer
}
func defaultQueue(t testing.TB) datastore.ReplicationEventQueue {
- return datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
+ return datastore.NewPostgresReplicationEventQueue(testdb.New(t))
}
func defaultTxMgr(conf config.Config) *transactions.Manager {
diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go
index 64129875b..8e270b87b 100644
--- a/internal/praefect/info_service_test.go
+++ b/internal/praefect/info_service_test.go
@@ -12,7 +12,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
@@ -65,7 +64,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- tx := glsql.NewDB(t).Begin(t)
+ tx := testdb.New(t).Begin(t)
defer tx.Rollback(t)
testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{
diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go
index 9645d7a9a..9dd4e3da8 100644
--- a/internal/praefect/nodes/health_manager_test.go
+++ b/internal/praefect/nodes/health_manager_test.go
@@ -10,6 +10,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)
@@ -60,7 +61,7 @@ func TestHealthManager(t *testing.T) {
HealthConsensus map[string][]string
}
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
@@ -558,7 +559,7 @@ func TestHealthManager_databaseTimeout(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
blockingTx := db.Begin(t)
defer blockingTx.Rollback(t)
@@ -596,14 +597,14 @@ func TestHealthManager_databaseTimeout(t *testing.T) {
}()
// Wait until the blocked query is waiting.
- glsql.WaitForBlockedQuery(ctx, t, db, "INSERT INTO node_status")
+ testdb.WaitForBlockedQuery(ctx, t, db, "INSERT INTO node_status")
// Simulate a timeout.
timeoutQuery()
// Query should have been canceled.
require.EqualError(t, <-blockedErr, "update checks: pq: canceling statement due to user request")
}
-func predateHealthChecks(t testing.TB, db glsql.DB, amount time.Duration) {
+func predateHealthChecks(t testing.TB, db testdb.DB, amount time.Duration) {
t.Helper()
_, err := db.Exec(`
@@ -618,7 +619,7 @@ func predateHealthChecks(t testing.TB, db glsql.DB, amount time.Duration) {
// This test case ensures the record updates are done in an ordered manner to avoid concurrent writes
// deadlocking. Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3907
func TestHealthManager_orderedWrites(t *testing.T) {
- db := glsql.NewDB(t)
+ db := testdb.New(t)
tx1 := db.Begin(t).Tx
defer func() { _ = tx1.Rollback() }()
@@ -648,7 +649,7 @@ func TestHealthManager_orderedWrites(t *testing.T) {
}()
// Wait for tx2 to be blocked on the gitaly-1 lock acquired by tx1
- glsql.WaitForBlockedQuery(ctx, t, db, "INSERT INTO node_status")
+ testdb.WaitForBlockedQuery(ctx, t, db, "INSERT INTO node_status")
// Ensure tx1 can acquire lock on gitaly-2.
require.NoError(t, hm1.updateHealthChecks(ctx, []string{virtualStorage}, []string{"gitaly-2"}, []bool{true}))
diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go
index 8d872a5dd..74a9915b8 100644
--- a/internal/praefect/nodes/per_repository_test.go
+++ b/internal/praefect/nodes/per_repository_test.go
@@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
@@ -47,7 +46,7 @@ func TestPerRepositoryElector(t *testing.T) {
primary matcher
}
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
@@ -523,7 +522,7 @@ func TestPerRepositoryElector(t *testing.T) {
const repositoryID int64 = 1
for _, step := range tc.steps {
- runElection := func(tx *glsql.TxWrapper) (string, *logrus.Entry) {
+ runElection := func(tx *testdb.TxWrapper) (string, *logrus.Entry) {
testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect-0": step.healthyNodes})
logger, hook := test.NewNullLogger()
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index a483d4271..9050d7287 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -16,6 +16,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -29,7 +30,7 @@ var shardName = "test-shard-0"
func TestGetPrimaryAndSecondaries(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
logger := testhelper.NewDiscardingLogger(t).WithField("test", t.Name())
praefectSocket := testhelper.GetTemporaryGitalySocketFileName(t)
@@ -74,7 +75,7 @@ func TestGetPrimaryAndSecondaries(t *testing.T) {
func TestSqlElector_slow_execution(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
praefectSocket := "unix://" + testhelper.GetTemporaryGitalySocketFileName(t)
logger := testhelper.NewDiscardingLogger(t).WithField("test", t.Name())
@@ -113,7 +114,7 @@ func TestSqlElector_slow_execution(t *testing.T) {
func TestBasicFailover(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
logger := testhelper.NewDiscardingLogger(t).WithField("test", t.Name())
praefectSocket := testhelper.GetTemporaryGitalySocketFileName(t)
@@ -224,7 +225,7 @@ func TestBasicFailover(t *testing.T) {
func TestElectDemotedPrimary(t *testing.T) {
t.Parallel()
- tx := glsql.NewDB(t).Begin(t)
+ tx := testdb.New(t).Begin(t)
defer tx.Rollback(t)
node := config.Node{Storage: "gitaly-0"}
@@ -263,7 +264,7 @@ func TestElectDemotedPrimary(t *testing.T) {
// predateLastSeenActiveAt shifts the last_seen_active_at column to an earlier time. This avoids
// waiting for the node's status to become unhealthy.
-func predateLastSeenActiveAt(t testing.TB, db glsql.DB, shardName, nodeName string, amount time.Duration) {
+func predateLastSeenActiveAt(t testing.TB, db testdb.DB, shardName, nodeName string, amount time.Duration) {
t.Helper()
_, err := db.Exec(`
@@ -290,7 +291,7 @@ func predateElection(t testing.TB, ctx context.Context, db glsql.Querier, shardN
func TestElectNewPrimary(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
ns := []*nodeStatus{{
node: config.Node{
@@ -462,7 +463,7 @@ func TestConnectionMultiplexing(t *testing.T) {
go srv.Serve(ln)
- db := glsql.NewDB(t)
+ db := testdb.New(t)
mgr, err := NewManager(
testhelper.NewDiscardingLogEntry(t),
config.Config{
diff --git a/internal/praefect/reconciler/reconciler_benchmark_test.go b/internal/praefect/reconciler/reconciler_benchmark_test.go
index cbcbebfe3..4c11630b8 100644
--- a/internal/praefect/reconciler/reconciler_benchmark_test.go
+++ b/internal/praefect/reconciler/reconciler_benchmark_test.go
@@ -7,8 +7,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func BenchmarkReconcile(b *testing.B) {
@@ -31,7 +31,7 @@ func benchmarkReconcile(b *testing.B, numRepositories int, worstCase bool) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.NewDB(b)
+ db := testdb.New(b)
behind := 0
if worstCase {
diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go
index 0fea162ad..54b9e7b93 100644
--- a/internal/praefect/reconciler/reconciler_test.go
+++ b/internal/praefect/reconciler/reconciler_test.go
@@ -15,8 +15,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
func TestReconciler(t *testing.T) {
@@ -82,7 +82,7 @@ func TestReconciler(t *testing.T) {
return out
}
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
@@ -1176,7 +1176,7 @@ func TestReconciler_renames(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
diff --git a/internal/praefect/remove_repository_test.go b/internal/praefect/remove_repository_test.go
index 0aec0c1f8..d2a0e9629 100644
--- a/internal/praefect/remove_repository_test.go
+++ b/internal/praefect/remove_repository_test.go
@@ -13,11 +13,11 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
@@ -41,7 +41,7 @@ func testRemoveRepositoryHandler(t *testing.T, ctx context.Context) {
errNotFound = helper.ErrNotFoundf("repository does not exist")
}
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
routeToGitaly bool
diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go
index c8cea0716..025597a0b 100644
--- a/internal/praefect/replicator_pg_test.go
+++ b/internal/praefect/replicator_pg_test.go
@@ -10,8 +10,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
)
@@ -49,7 +49,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) {
require.NoError(t, err)
defer testhelper.MustClose(t, targetCC)
- rs := datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil)
+ rs := datastore.NewPostgresRepositoryStore(testdb.New(t), nil)
require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "relative-path-1", "gitaly-1", nil, nil, true, false))
@@ -76,7 +76,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) {
func TestReplicatorDestroy(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
change datastore.ChangeType
error error
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 2dbf02045..3d13b2c3c 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -28,13 +28,13 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
@@ -150,7 +150,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
logger := testhelper.NewDiscardingLogger(t)
loggerHook := test.NewLocal(logger)
- queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
cancel() // when it is called we know that replication is finished
return queue.Acknowledge(ctx, state, ids)
@@ -160,7 +160,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
_, err = queue.Enqueue(ctx, events[0])
require.NoError(t, err)
- db := glsql.NewDB(t)
+ db := testdb.New(t)
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false))
@@ -311,7 +311,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
// unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty
// By using WaitGroup we are sure the test cleanup will be started after all replication
// requests are completed, so no running cache IO operations happen.
- queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
var wg sync.WaitGroup
queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
wg.Add(1)
@@ -688,7 +688,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
// this job exists to verify that replication works
okJob := datastore.ReplicationJob{
@@ -717,7 +717,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false))
@@ -795,7 +795,7 @@ func TestProcessBacklog_Success(t *testing.T) {
},
}
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
ackIDs, err := queue.Acknowledge(ctx, state, ids)
if len(ids) > 0 {
@@ -874,7 +874,7 @@ func TestProcessBacklog_Success(t *testing.T) {
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false))
@@ -921,7 +921,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
var mtx sync.Mutex
expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true}
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) {
select {
case <-ctx.Done():
@@ -1004,7 +1004,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
},
}
- queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
_, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RepositoryID: 1,
@@ -1017,7 +1017,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
})
require.NoError(t, err)
- db := glsql.NewDB(t)
+ db := testdb.New(t)
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false))
diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go
index 029be5d3a..7acf5f142 100644
--- a/internal/praefect/repocleaner/repository_test.go
+++ b/internal/praefect/repocleaner/repository_test.go
@@ -17,11 +17,11 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
)
@@ -48,8 +48,8 @@ func TestRunner_Run(t *testing.T) {
g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
g3Addr := testserver.RunGitalyServer(t, g3Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
- db := glsql.NewDB(t)
- dbConf := glsql.GetDBConfig(t, db.Name)
+ db := testdb.New(t)
+ dbConf := testdb.GetConfig(t, db.Name)
conf := config.Config{
SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
@@ -197,8 +197,8 @@ func TestRunner_Run_noAvailableStorages(t *testing.T) {
g1Cfg := testcfg.Build(t, testcfg.WithStorages(storage1))
g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
- db := glsql.NewDB(t)
- dbConf := glsql.GetDBConfig(t, db.Name)
+ db := testdb.New(t)
+ dbConf := testdb.GetConfig(t, db.Name)
conf := config.Config{
SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
index 2d7d2fe84..af688dc32 100644
--- a/internal/praefect/repository_exists_test.go
+++ b/internal/praefect/repository_exists_test.go
@@ -9,10 +9,10 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -23,7 +23,7 @@ func TestRepositoryExistsHandler(t *testing.T) {
t.Parallel()
errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly")
- db := glsql.NewDB(t)
+ db := testdb.New(t)
for _, tc := range []struct {
desc string
routeToGitaly bool
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index 8372422d4..ca2079639 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
@@ -116,7 +115,7 @@ func TestPerRepositoryRouter_RouteStorageAccessor(t *testing.T) {
func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
const relativePath = "repository"
@@ -269,7 +268,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
t.Parallel()
- db := glsql.NewDB(t)
+ db := testdb.New(t)
configuredNodes := map[string][]string{
"virtual-storage-1": {"primary", "secondary-1", "secondary-2"},
@@ -482,7 +481,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
secondary1Conn := &grpc.ClientConn{}
secondary2Conn := &grpc.ClientConn{}
- db := glsql.NewDB(t)
+ db := testdb.New(t)
const relativePath = "relative-path"
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 241ce6f86..d3563f1ef 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -23,7 +23,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
@@ -32,6 +31,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
@@ -71,7 +71,7 @@ func TestServerFactory(t *testing.T) {
revision := text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "HEAD"))
logger := testhelper.NewDiscardingLogEntry(t)
- queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
rs := datastore.MockRepositoryStore{}
txMgr := transactions.NewManager(conf)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index c54714143..61d718f03 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -31,7 +31,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
@@ -536,7 +535,7 @@ func testRemoveRepository(t *testing.T, ctx context.Context) {
verifyReposExistence(t, codes.OK)
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
repoStore := defaultRepoStore(praefectCfg)
txMgr := defaultTxMgr(praefectCfg)
nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), praefectCfg, nil,
@@ -633,9 +632,9 @@ func TestRenameRepository(t *testing.T) {
repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath)
}
- evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
+ evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
- tx := glsql.NewDB(t).Begin(t)
+ tx := testdb.New(t).Begin(t)
defer tx.Rollback(t)
rs := datastore.NewPostgresRepositoryStore(tx, nil)
@@ -834,7 +833,7 @@ func TestProxyWrites(t *testing.T) {
},
}
- queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
entry := testhelper.NewDiscardingLogEntry(t)
nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
@@ -970,7 +969,7 @@ func TestErrorThreshold(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
entry := testhelper.NewDiscardingLogEntry(t)
testCases := []struct {
diff --git a/internal/testhelper/testdb/db.go b/internal/testhelper/testdb/db.go
index 73c5d39f8..529ebb4a0 100644
--- a/internal/testhelper/testdb/db.go
+++ b/internal/testhelper/testdb/db.go
@@ -2,50 +2,402 @@ package testdb
import (
"context"
+ "database/sql"
+ "errors"
+ "net"
+ "os"
+ "os/exec"
+ "strconv"
+ "strings"
+ "sync"
"testing"
+ "time"
- "github.com/lib/pq"
+ "github.com/google/uuid"
+ migrate "github.com/rubenv/sql-migrate"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
-// SetHealthyNodes sets the healthy nodes in the database as determined by the passed in map. The healthyNodes map is keyed by
-// praefect name -> virtual storage -> storage. On each run, it clears all previous health checks from the table, so the
-// passed in nodes are the only ones considered healthy after the function. As the healthy nodes are determined by the time of
-// the last successful health check, this should be run in the same transastion as the tested query to prevent flakiness.
-//
-//nolint:revive
-func SetHealthyNodes(t testing.TB, ctx context.Context, db glsql.Querier, healthyNodes map[string]map[string][]string) {
- t.Helper()
-
- var praefects, virtualStorages, storages []string
- for praefect, virtualStors := range healthyNodes {
- for virtualStorage, stors := range virtualStors {
- for _, storage := range stors {
- praefects = append(praefects, praefect)
- virtualStorages = append(virtualStorages, virtualStorage)
- storages = append(storages, storage)
- }
- }
+const (
+ advisoryLockIDDatabaseTemplate = 1627644550
+ praefectTemplateDatabase = "praefect_template"
+)
+
+// TxWrapper is a simple wrapper around *sql.Tx.
+type TxWrapper struct {
+ *sql.Tx
+}
+
+// Rollback executes Rollback operation on the wrapped *sql.Tx if it is set.
+// After execution is sets Tx to nil to prevent errors on the repeated invocations (useful
+// for testing when Rollback is deferred).
+func (txw *TxWrapper) Rollback(t testing.TB) {
+ t.Helper()
+ if txw.Tx != nil {
+ require.NoError(t, txw.Tx.Rollback())
+ txw.Tx = nil
+ }
+}
+
+// Commit executes Commit operation on the wrapped *sql.Tx if it is set.
+// After execution is sets Tx to nil to prevent errors on the deferred invocations (useful
+// for testing when Rollback is deferred).
+func (txw *TxWrapper) Commit(t testing.TB) {
+ t.Helper()
+ if txw.Tx != nil {
+ require.NoError(t, txw.Tx.Commit())
+ txw.Tx = nil
+ }
+}
+
+// DB is a helper struct that should be used only for testing purposes.
+type DB struct {
+ *sql.DB
+ // Name is a name of the database.
+ Name string
+}
+
+// Begin starts a new transaction and returns it wrapped into TxWrapper.
+func (db DB) Begin(t testing.TB) *TxWrapper {
+ t.Helper()
+ tx, err := db.DB.Begin()
+ require.NoError(t, err)
+ return &TxWrapper{Tx: tx}
+}
+
+// Truncate removes all data from the list of tables and restarts identities for them.
+func (db DB) Truncate(t testing.TB, tables ...string) {
+ t.Helper()
+
+ for _, table := range tables {
+ _, err := db.DB.Exec("DELETE FROM " + table)
+ require.NoError(t, err, "database cleanup failed: %s", tables)
}
- _, err := db.ExecContext(ctx, `
-WITH clear_previous_checks AS ( DELETE FROM node_status )
-
-INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
-SELECT
- unnest($1::text[]) AS praefect_name,
- unnest($2::text[]) AS shard_name,
- unnest($3::text[]) AS node_name,
- NOW() AS last_contact_attempt_at,
- NOW() AS last_seen_active_at
-ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET
- last_contact_attempt_at = NOW(),
- last_seen_active_at = NOW()
- `,
- pq.StringArray(praefects),
- pq.StringArray(virtualStorages),
- pq.StringArray(storages),
+ _, err := db.DB.Exec("SELECT setval(relname::TEXT, 1, false) from pg_class where relkind = 'S'")
+ require.NoError(t, err, "database cleanup failed: %s", tables)
+}
+
+// RequireRowsInTable verifies that `tname` table has `n` amount of rows in it.
+func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) {
+ t.Helper()
+
+ var count int
+ require.NoError(t, db.QueryRow("SELECT COUNT(*) FROM "+tname).Scan(&count))
+ require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n)
+}
+
+// TruncateAll removes all data from known set of tables.
+func (db DB) TruncateAll(t testing.TB) {
+ db.Truncate(t,
+ "replication_queue_job_lock",
+ "replication_queue",
+ "replication_queue_lock",
+ "node_status",
+ "shard_primaries",
+ "storage_repositories",
+ "repositories",
+ "virtual_storages",
+ "repository_assignments",
+ "storage_cleanups",
)
+}
+
+// MustExec executes `q` with `args` and verifies there are no errors.
+func (db DB) MustExec(t testing.TB, q string, args ...interface{}) {
+ _, err := db.DB.Exec(q, args...)
+ require.NoError(t, err)
+}
+
+// Close removes schema if it was used and releases connection pool.
+func (db DB) Close() error {
+ if err := db.DB.Close(); err != nil {
+ return errors.New("failed to release connection pool: " + err.Error())
+ }
+ return nil
+}
+
+// New returns a wrapper around the database connection pool.
+// Must be used only for testing.
+// The new database with empty relations will be created for each call of this function.
+// It uses env vars:
+// PGHOST - required, URL/socket/dir
+// PGPORT - required, binding port
+// PGUSER - optional, user - `$ whoami` would be used if not provided
+// Once the test is completed the database will be dropped on test cleanup execution.
+func New(t testing.TB) DB {
+ t.Helper()
+ database := "praefect_" + strings.ReplaceAll(uuid.New().String(), "-", "")
+ return DB{DB: initPraefectDB(t, database), Name: database}
+}
+
+// GetConfig returns the database configuration determined by
+// environment variables. See NewDB() for the list of variables.
+func GetConfig(t testing.TB, database string) config.DB {
+ env := getDatabaseEnvironment(t)
+
+ require.Contains(t, env, "PGHOST", "PGHOST env var expected to be provided to connect to Postgres database")
+ require.Contains(t, env, "PGPORT", "PGHOST env var expected to be provided to connect to Postgres database")
+
+ portNumber, err := strconv.Atoi(env["PGPORT"])
+ require.NoError(t, err, "PGPORT must be a port number of the Postgres database listens for incoming connections")
+
+ // connect to 'postgres' database first to re-create testing database from scratch
+ conf := config.DB{
+ Host: env["PGHOST"],
+ Port: portNumber,
+ DBName: database,
+ SSLMode: "disable",
+ User: env["PGUSER"],
+ SessionPooled: config.DBConnection{
+ Host: env["PGHOST"],
+ Port: portNumber,
+ },
+ }
+
+ if bouncerHost, ok := env["PGHOST_PGBOUNCER"]; ok {
+ conf.Host = bouncerHost
+ }
+
+ if bouncerPort, ok := env["PGPORT_PGBOUNCER"]; ok {
+ bouncerPortNumber, err := strconv.Atoi(bouncerPort)
+ require.NoError(t, err, "PGPORT_PGBOUNCER must be a port number of the PgBouncer")
+ conf.Port = bouncerPortNumber
+ }
+
+ return conf
+}
+
+func requireSQLOpen(t testing.TB, dbCfg config.DB, direct bool) *sql.DB {
+ t.Helper()
+ db, err := sql.Open("postgres", glsql.DSN(dbCfg, direct))
+ require.NoErrorf(t, err, "failed to connect to %q database", dbCfg.DBName)
+ if !assert.NoErrorf(t, db.Ping(), "failed to communicate with %q database", dbCfg.DBName) {
+ require.NoErrorf(t, db.Close(), "release connection to the %q database", dbCfg.DBName)
+ }
+ return db
+}
+
+func requireTerminateAllConnections(t testing.TB, db *sql.DB, database string) {
+ t.Helper()
+ _, err := db.Exec("SELECT PG_TERMINATE_BACKEND(pid) FROM PG_STAT_ACTIVITY WHERE datname = '" + database + "'")
require.NoError(t, err)
+
+ // Once the pg_terminate_backend has completed, we may need to wait before the connections
+ // are fully released. pg_terminate_backend will return true as long as the signal was
+ // sent successfully, but the backend needs to respond to the signal to close the connection.
+ // TODO: In Postgre 14, pg_terminate_backend takes an optional timeout argument that makes it a blocking
+ // call. https://gitlab.com/gitlab-org/gitaly/-/issues/3937 tracks the refactor work to remove this
+ // require.Eventuallyf call in favor of passing in a timeout to pg_terminate_backend
+ require.Eventuallyf(t, func() bool {
+ var openConnections int
+ require.NoError(t, db.QueryRow(
+ `SELECT COUNT(*) FROM pg_stat_activity
+ WHERE datname = $1 AND pid != pg_backend_pid()`, database).
+ Scan(&openConnections))
+ return openConnections == 0
+ }, 20*time.Second, 10*time.Millisecond, "wait for all connections to be terminated")
+}
+
+func initPraefectDB(t testing.TB, database string) *sql.DB {
+ t.Helper()
+
+ dbCfg := GetConfig(t, "postgres")
+ // We require a direct connection to the Postgres instance and not through the PgBouncer
+ // because we use transaction pool mood for it and it doesn't work well for system advisory locks.
+ postgresDB := requireSQLOpen(t, dbCfg, true)
+ defer func() { require.NoErrorf(t, postgresDB.Close(), "release connection to the %q database", dbCfg.DBName) }()
+
+ // Acquire exclusive advisory lock to prevent other concurrent test from doing the same.
+ _, err := postgresDB.Exec(`SELECT pg_advisory_lock($1)`, advisoryLockIDDatabaseTemplate)
+ require.NoError(t, err, "not able to acquire lock for synchronisation")
+ var advisoryUnlock func()
+ advisoryUnlock = func() {
+ require.True(t, scanSingleBool(t, postgresDB, `SELECT pg_advisory_unlock($1)`, advisoryLockIDDatabaseTemplate), "release advisory lock")
+ advisoryUnlock = func() {}
+ }
+ defer func() { advisoryUnlock() }()
+
+ templateDBExists := databaseExist(t, postgresDB, praefectTemplateDatabase)
+ if !templateDBExists {
+ _, err := postgresDB.Exec("CREATE DATABASE " + praefectTemplateDatabase + " WITH ENCODING 'UTF8'")
+ require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase)
+ }
+
+ templateDBConf := GetConfig(t, praefectTemplateDatabase)
+ templateDB := requireSQLOpen(t, templateDBConf, true)
+ defer func() {
+ require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName)
+ }()
+
+ if _, err := glsql.Migrate(templateDB, false); err != nil {
+ // If database has unknown migration we try to re-create template database with
+ // current migration. It may be caused by other code changes done in another branch.
+ if pErr := (*migrate.PlanError)(nil); errors.As(err, &pErr) {
+ if strings.EqualFold(pErr.ErrorMessage, "unknown migration in database") {
+ require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName)
+
+ _, err = postgresDB.Exec("DROP DATABASE " + praefectTemplateDatabase)
+ require.NoErrorf(t, err, "failed to drop %q database", praefectTemplateDatabase)
+ _, err = postgresDB.Exec("CREATE DATABASE " + praefectTemplateDatabase + " WITH ENCODING 'UTF8'")
+ require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase)
+
+ remigrateTemplateDB := requireSQLOpen(t, templateDBConf, true)
+ defer func() {
+ require.NoErrorf(t, remigrateTemplateDB.Close(), "release connection to the %q database", templateDBConf.DBName)
+ }()
+ _, err = glsql.Migrate(remigrateTemplateDB, false)
+ require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase)
+ } else {
+ require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase)
+ }
+ } else {
+ require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase)
+ }
+ }
+
+ // Release advisory lock as soon as possible to unblock other tests from execution.
+ advisoryUnlock()
+
+ require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName)
+
+ _, err = postgresDB.Exec(`CREATE DATABASE ` + database + ` TEMPLATE ` + praefectTemplateDatabase)
+ require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase)
+
+ t.Cleanup(func() {
+ if _, ok := getDatabaseEnvironment(t)["PGHOST_PGBOUNCER"]; ok {
+ pgbouncerCfg := dbCfg
+ // This database name will connect us to the special admin console.
+ pgbouncerCfg.DBName = "pgbouncer"
+
+ // We cannot use `requireSQLOpen()` because it would ping the database,
+ // which is not supported by the PgBouncer admin console.
+ pgbouncerDB, err := sql.Open("postgres", glsql.DSN(pgbouncerCfg, false))
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, pgbouncerDB)
+
+ // Trying to release connections like we do with the "normal" Postgres
+ // database regularly results in flaky tests with PgBouncer given that the
+ // connections are seemingly never released. Instead, we kill PgBouncer
+ // connections by connecting to its admin console and using the KILL
+ // command, which instructs it to kill all client and server connections.
+ _, err = pgbouncerDB.Exec("KILL " + database)
+ require.NoError(t, err, "killing PgBouncer connections")
+ }
+
+ dbCfg.DBName = "postgres"
+ postgresDB := requireSQLOpen(t, dbCfg, true)
+ defer testhelper.MustClose(t, postgresDB)
+
+ // We need to force-terminate open connections as for the tasks that use PgBouncer
+ // the actual client connected to the database is a PgBouncer and not a test that is
+ // running.
+ requireTerminateAllConnections(t, postgresDB, database)
+
+ _, err = postgresDB.Exec("DROP DATABASE " + database)
+ require.NoErrorf(t, err, "failed to drop %q database", database)
+ })
+
+ // Connect to the testing database with optional PgBouncer
+ dbCfg.DBName = database
+ praefectTestDB := requireSQLOpen(t, dbCfg, false)
+ t.Cleanup(func() {
+ if err := praefectTestDB.Close(); !errors.Is(err, net.ErrClosed) {
+ require.NoErrorf(t, err, "release connection to the %q database", dbCfg.DBName)
+ }
+ })
+ return praefectTestDB
+}
+
+func databaseExist(t testing.TB, db *sql.DB, database string) bool {
+ return scanSingleBool(t, db, `SELECT EXISTS(SELECT * FROM pg_database WHERE datname = $1)`, database)
+}
+
+func scanSingleBool(t testing.TB, db *sql.DB, query string, args ...interface{}) bool {
+ var flag bool
+ row := db.QueryRow(query, args...)
+ require.NoError(t, row.Scan(&flag))
+ return flag
+}
+
+var (
+ // Running `gdk env` takes about 250ms on my system and is thus comparatively slow. When
+ // running with Praefect as proxy, this time adds up and may thus slow down tests by quite a
+ // margin. We thus amortize these costs by only running it once.
+ databaseEnvOnce sync.Once
+ databaseEnv map[string]string
+)
+
+func getDatabaseEnvironment(t testing.TB) map[string]string {
+ databaseEnvOnce.Do(func() {
+ envvars := map[string]string{}
+
+ // We only process output if `gdk env` returned success. If it didn't, we simply assume that
+ // we are not running in a GDK environment and will try to extract variables from the
+ // environment instead.
+ if output, err := exec.Command("gdk", "env").Output(); err == nil {
+ for _, line := range strings.Split(string(output), "\n") {
+ const prefix = "export "
+ if !strings.HasPrefix(line, prefix) {
+ continue
+ }
+
+ split := strings.SplitN(strings.TrimPrefix(line, prefix), "=", 2)
+ if len(split) != 2 {
+ continue
+ }
+
+ envvars[split[0]] = split[1]
+ }
+ }
+
+ for _, key := range []string{"PGHOST", "PGPORT", "PGUSER", "PGHOST_PGBOUNCER", "PGPORT_PGBOUNCER"} {
+ if _, ok := envvars[key]; !ok {
+ value, ok := os.LookupEnv(key)
+ if ok {
+ envvars[key] = value
+ }
+ }
+ }
+
+ databaseEnv = envvars
+ })
+
+ return databaseEnv
+}
+
+// WaitForBlockedQuery is a helper that waits until a blocked query matching the prefix is present in the
+// database. This is useful for ensuring another transaction is blocking a query when testing concurrent
+// execution of multiple queries.
+func WaitForBlockedQuery(ctx context.Context, t testing.TB, db glsql.Querier, queryPrefix string) {
+ t.Helper()
+
+ for {
+ var queryBlocked bool
+ require.NoError(t, db.QueryRowContext(ctx, `
+ SELECT EXISTS (
+ SELECT FROM pg_stat_activity
+ WHERE TRIM(e'\n' FROM query) LIKE $1
+ AND state = 'active'
+ AND wait_event_type = 'Lock'
+ AND datname = current_database()
+ )
+ `, queryPrefix+"%").Scan(&queryBlocked))
+
+ if queryBlocked {
+ return
+ }
+
+ retry := time.NewTimer(time.Millisecond)
+ select {
+ case <-ctx.Done():
+ retry.Stop()
+ return
+ case <-retry.C:
+ }
+ }
}
diff --git a/internal/praefect/datastore/glsql/testing_test.go b/internal/testhelper/testdb/db_test.go
index 26a4f499c..a3687d977 100644
--- a/internal/praefect/datastore/glsql/testing_test.go
+++ b/internal/testhelper/testdb/db_test.go
@@ -1,4 +1,4 @@
-package glsql
+package testdb
import (
"testing"
@@ -8,7 +8,7 @@ import (
func TestDB_Truncate(t *testing.T) {
t.Parallel()
- db := NewDB(t)
+ db := New(t)
_, err := db.Exec("CREATE TABLE truncate_tbl(id BIGSERIAL PRIMARY KEY)")
require.NoError(t, err)
diff --git a/internal/testhelper/testdb/health.go b/internal/testhelper/testdb/health.go
new file mode 100644
index 000000000..73c5d39f8
--- /dev/null
+++ b/internal/testhelper/testdb/health.go
@@ -0,0 +1,51 @@
+package testdb
+
+import (
+ "context"
+ "testing"
+
+ "github.com/lib/pq"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+)
+
+// SetHealthyNodes sets the healthy nodes in the database as determined by the passed in map. The healthyNodes map is keyed by
+// praefect name -> virtual storage -> storage. On each run, it clears all previous health checks from the table, so the
+// passed in nodes are the only ones considered healthy after the function. As the healthy nodes are determined by the time of
+// the last successful health check, this should be run in the same transastion as the tested query to prevent flakiness.
+//
+//nolint:revive
+func SetHealthyNodes(t testing.TB, ctx context.Context, db glsql.Querier, healthyNodes map[string]map[string][]string) {
+ t.Helper()
+
+ var praefects, virtualStorages, storages []string
+ for praefect, virtualStors := range healthyNodes {
+ for virtualStorage, stors := range virtualStors {
+ for _, storage := range stors {
+ praefects = append(praefects, praefect)
+ virtualStorages = append(virtualStorages, virtualStorage)
+ storages = append(storages, storage)
+ }
+ }
+ }
+
+ _, err := db.ExecContext(ctx, `
+WITH clear_previous_checks AS ( DELETE FROM node_status )
+
+INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
+SELECT
+ unnest($1::text[]) AS praefect_name,
+ unnest($2::text[]) AS shard_name,
+ unnest($3::text[]) AS node_name,
+ NOW() AS last_contact_attempt_at,
+ NOW() AS last_seen_active_at
+ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET
+ last_contact_attempt_at = NOW(),
+ last_seen_active_at = NOW()
+ `,
+ pq.StringArray(praefects),
+ pq.StringArray(virtualStorages),
+ pq.StringArray(storages),
+ )
+ require.NoError(t, err)
+}
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index d01b80fc2..0e48d6622 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -31,10 +31,10 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitlab"
praefectconfig "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
@@ -85,7 +85,7 @@ func StartGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Serv
// createDatabase create a new database with randomly generated name and returns it back to the caller.
func createDatabase(t testing.TB) string {
- db := glsql.NewDB(t)
+ db := testdb.New(t)
return db.Name
}
@@ -113,7 +113,7 @@ func runPraefectProxy(t testing.TB, cfg config.Cfg, gitalyAddr, praefectBinPath
Auth: auth.Config{
Token: cfg.Auth.Token,
},
- DB: glsql.GetDBConfig(t, dbName),
+ DB: testdb.GetConfig(t, dbName),
Failover: praefectconfig.Failover{
Enabled: true,
ElectionStrategy: praefectconfig.ElectionStrategyLocal,