diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-12-16 11:20:11 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-12-16 11:20:11 +0300 |
commit | f5ca110dbbc1620b1fd5386e432248570b103c66 (patch) | |
tree | fcf015a5968e31099d3b47fc42c28828bfb2e352 | |
parent | 152dd3c663c73f79db4c1ff0f63b9011316e35a0 (diff) | |
parent | 689073bfa187427bdbd30ad3eceacd7e6678cc6a (diff) |
Merge branch 'pks-pgbouncer-kill-connections' into 'master'
testdb: Fix stuck PgBouncer connections by killing via admin command
See merge request gitlab-org/gitaly!4193
45 files changed, 610 insertions, 611 deletions
diff --git a/cmd/praefect/main_test.go b/cmd/praefect/main_test.go index 25a615876..9f789bf24 100644 --- a/cmd/praefect/main_test.go +++ b/cmd/praefect/main_test.go @@ -13,8 +13,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func TestMain(m *testing.M) { @@ -190,8 +190,8 @@ func (m *mockRegisterer) Gather() ([]*dto.MetricFamily, error) { func TestExcludeDatabaseMetricsFromDefaultMetrics(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) - dbConf := glsql.GetDBConfig(t, db.Name) + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) conf := config.Config{ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go index ef80069b7..8ebc3927a 100644 --- a/cmd/praefect/subcmd_accept_dataloss_test.go +++ b/cmd/praefect/subcmd_accept_dataloss_test.go @@ -7,9 +7,9 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/info" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" ) @@ -35,7 +35,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db := glsql.NewDB(t) + db := testdb.New(t) rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) startingGenerations := map[string]int{st1: 1, st2: 0, st3: datastore.GenerationUnknown} diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go index b4fe18eb6..b46bef8dd 100644 --- a/cmd/praefect/subcmd_dataloss_test.go +++ b/cmd/praefect/subcmd_dataloss_test.go @@ -7,7 +7,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/info" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" @@ -42,7 +41,7 @@ func TestDatalossSubcommand(t *testing.T) { }, } - tx := glsql.NewDB(t).Begin(t) + tx := testdb.New(t).Begin(t) defer tx.Rollback(t) ctx, cancel := testhelper.Context() diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go index 527a3219c..c54ecd9ea 100644 --- a/cmd/praefect/subcmd_list_untracked_repositories_test.go +++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go @@ -17,9 +17,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" ) @@ -64,10 +64,10 @@ func TestListUntrackedRepositories_Exec(t *testing.T) { g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) - db := glsql.NewDB(t) + db := testdb.New(t) var database string require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database)) - dbConf := glsql.GetDBConfig(t, database) + dbConf := testdb.GetConfig(t, database) conf := config.Config{ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), diff --git a/cmd/praefect/subcmd_metadata_test.go b/cmd/praefect/subcmd_metadata_test.go index dc5cd2b60..3432457cf 100644 --- a/cmd/praefect/subcmd_metadata_test.go +++ b/cmd/praefect/subcmd_metadata_test.go @@ -9,7 +9,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/info" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" @@ -23,7 +22,7 @@ func TestMetadataSubcommand(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - tx := glsql.NewDB(t).Begin(t) + tx := testdb.New(t).Begin(t) defer tx.Rollback(t) testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{ diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go index bf2ece389..a1a4f6cbc 100644 --- a/cmd/praefect/subcmd_remove_repository_test.go +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -20,9 +20,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" ) @@ -75,8 +75,8 @@ func TestRemoveRepository_Exec(t *testing.T) { g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) - db := glsql.NewDB(t) - dbConf := glsql.GetDBConfig(t, db.Name) + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) conf := config.Config{ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), @@ -233,7 +233,7 @@ func TestRemoveRepository_Exec(t *testing.T) { <-stopped } -func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) { +func requireNoDatabaseInfo(t *testing.T, db testdb.DB, cmd *removeRepository) { t.Helper() var repositoryRowExists bool require.NoError(t, db.QueryRow( @@ -259,7 +259,7 @@ func TestRemoveRepository_removeReplicationEvents(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db := glsql.NewDB(t) + db := testdb.New(t) queue := datastore.NewPostgresReplicationEventQueue(db) diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go index 6e0d0e3e0..03c469844 100644 --- a/cmd/praefect/subcmd_set_replication_factor_test.go +++ b/cmd/praefect/subcmd_set_replication_factor_test.go @@ -8,16 +8,16 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/info" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func TestSetReplicationFactorSubcommand(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go index ed55c1ce7..e1e08a3c3 100644 --- a/cmd/praefect/subcmd_track_repository_test.go +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -15,12 +15,12 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" ) @@ -86,8 +86,8 @@ func TestAddRepository_Exec(t *testing.T) { g1Addr := g1Srv.Address() - db := glsql.NewDB(t) - dbConf := glsql.GetDBConfig(t, db.Name) + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) virtualStorageName := "praefect" conf := config.Config{ diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index 9031cf83e..65abe05f8 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -20,9 +20,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" praefectConfig "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/protobuf/proto" @@ -285,8 +285,8 @@ func TestManager_Restore_praefect(t *testing.T) { gitalyAddr := testserver.RunGitalyServer(t, gitalyCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) - db := glsql.NewDB(t) - dbConf := glsql.GetDBConfig(t, db.Name) + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) conf := praefectConfig.Config{ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index af610e240..4dfd6ad8a 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -11,13 +11,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/auth" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -149,7 +149,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, }, } logEntry := testhelper.NewDiscardingLogEntry(t) - queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) + queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) require.NoError(t, err) diff --git a/internal/praefect/checks_test.go b/internal/praefect/checks_test.go index 5b1481ba4..c259db08d 100644 --- a/internal/praefect/checks_test.go +++ b/internal/praefect/checks_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/migrations" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -72,8 +71,8 @@ func TestPraefectMigrations_success(t *testing.T) { defer cancel() var cfg config.Config - db := glsql.NewDB(t) - cfg.DB = glsql.GetDBConfig(t, db.Name) + db := testdb.New(t) + cfg.DB = testdb.GetConfig(t, db.Name) require.NoError(t, tc.prepare(cfg)) @@ -358,20 +357,20 @@ func runNodes(t *testing.T, nodes []nodeAssertion) ([]*config.Node, func()) { func TestPostgresReadWriteCheck(t *testing.T) { testCases := []struct { desc string - setup func(t *testing.T, db glsql.DB) config.DB + setup func(t *testing.T, db testdb.DB) config.DB expectedErr string expectedLog string }{ { desc: "read and write work", - setup: func(t *testing.T, db glsql.DB) config.DB { - return glsql.GetDBConfig(t, db.Name) + setup: func(t *testing.T, db testdb.DB) config.DB { + return testdb.GetConfig(t, db.Name) }, expectedLog: "successfully read from database\nsuccessfully wrote to database\n", }, { desc: "read only", - setup: func(t *testing.T, db glsql.DB) config.DB { + setup: func(t *testing.T, db testdb.DB) config.DB { role := "praefect_ro_role_" + strings.ReplaceAll(uuid.New().String(), "-", "") _, err := db.Exec(fmt.Sprintf(` @@ -386,7 +385,7 @@ func TestPostgresReadWriteCheck(t *testing.T) { require.NoError(t, err) }) - dbCfg := glsql.GetDBConfig(t, db.Name) + dbCfg := testdb.GetConfig(t, db.Name) dbCfg.User = role dbCfg.Password = "" @@ -402,7 +401,7 @@ func TestPostgresReadWriteCheck(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db := glsql.NewDB(t) + db := testdb.New(t) t.Cleanup(func() { require.NoError(t, db.Close()) }) dbConf := tc.setup(t, db) @@ -475,8 +474,8 @@ func TestNewUnavailableReposCheck(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db := glsql.NewDB(t) - dbCfg := glsql.GetDBConfig(t, db.Name) + db := testdb.New(t) + dbCfg := testdb.GetConfig(t, db.Name) conf.DB = dbCfg rs := datastore.NewPostgresRepositoryStore(db, nil) diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 3eb70b8d0..ee7a5e6ab 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -12,7 +12,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions" @@ -154,7 +153,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { }, } - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range testcases { t.Run(tc.desc, func(t *testing.T) { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 70dc062aa..31b913c60 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -28,7 +28,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" @@ -65,7 +64,7 @@ func TestSecondaryRotation(t *testing.T) { func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string readOnly bool @@ -157,7 +156,7 @@ func TestStreamDirectorMutator(t *testing.T) { }, }, } - db := glsql.NewDB(t) + db := testdb.New(t) targetRepo := gitalypb.Repository{ StorageName: "praefect", @@ -334,7 +333,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( - datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)), + datastore.NewPostgresReplicationEventQueue(testdb.New(t)), rs, NewNodeManagerRouter(nodeMgr, rs), txMgr, @@ -427,7 +426,7 @@ func TestStreamDirectorAccessor(t *testing.T) { }, } - queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) + queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) targetRepo := gitalypb.Repository{ StorageName: "praefect", @@ -534,7 +533,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { }, } - queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) + queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) targetRepo := gitalypb.Repository{ StorageName: "praefect", @@ -821,7 +820,7 @@ func TestRewrittenRepositoryMessage(t *testing.T) { func TestStreamDirector_repo_creation(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string @@ -1097,7 +1096,7 @@ func TestAbsentCorrelationID(t *testing.T) { }, } - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go index faa5691da..3de909a37 100644 --- a/internal/praefect/datastore/assignment_test.go +++ b/internal/praefect/datastore/assignment_test.go @@ -8,8 +8,8 @@ import ( "github.com/lib/pq" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func TestAssignmentStore_GetHostAssignments(t *testing.T) { @@ -20,7 +20,7 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) { storage string } - db := glsql.NewDB(t) + db := testdb.New(t) configuredStorages := []string{"storage-1", "storage-2", "storage-3"} for _, tc := range []struct { @@ -131,7 +131,7 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { } } - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go index 861fe3998..7afb09cd0 100644 --- a/internal/praefect/datastore/collector_test.go +++ b/internal/praefect/datastore/collector_test.go @@ -15,7 +15,6 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) @@ -36,7 +35,7 @@ func TestRepositoryStoreCollector(t *testing.T) { replicas replicas } - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string diff --git a/internal/praefect/datastore/glsql/mock.go b/internal/praefect/datastore/glsql/mock.go deleted file mode 100644 index ccd1ddb85..000000000 --- a/internal/praefect/datastore/glsql/mock.go +++ /dev/null @@ -1,23 +0,0 @@ -package glsql - -import ( - "context" - "database/sql" -) - -// MockQuerier allows for mocking database operations out. -type MockQuerier struct { - ExecContextFunc func(context.Context, string, ...interface{}) (sql.Result, error) - QueryContextFunc func(context.Context, string, ...interface{}) (*sql.Rows, error) - Querier -} - -// ExecContext runs the mock's ExecContextFunc. -func (m MockQuerier) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { - return m.ExecContextFunc(ctx, query, args...) -} - -// QueryContext runs the mock's QueryContextFunc. -func (m MockQuerier) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { - return m.QueryContextFunc(ctx, query, args...) -} diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go index 590627b21..c5f551695 100644 --- a/internal/praefect/datastore/glsql/postgres_test.go +++ b/internal/praefect/datastore/glsql/postgres_test.go @@ -1,4 +1,4 @@ -package glsql +package glsql_test import ( "net" @@ -7,18 +7,20 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func TestOpenDB(t *testing.T) { - dbCfg := GetDBConfig(t, "postgres") + dbCfg := testdb.GetConfig(t, "postgres") ctx, cancel := testhelper.Context() defer cancel() t.Run("failed to ping because of incorrect config", func(t *testing.T) { badCfg := dbCfg badCfg.Host = "not-existing.com" - _, err := OpenDB(ctx, badCfg) + _, err := glsql.OpenDB(ctx, badCfg) require.Error(t, err) // Locally the error looks like: // send ping: dial tcp: lookup not-existing.com: no such host @@ -39,21 +41,21 @@ func TestOpenDB(t *testing.T) { ctx, cancel := testhelper.Context() cancel() - _, err = OpenDB(ctx, badCfg) + _, err = glsql.OpenDB(ctx, badCfg) require.EqualError(t, err, "context canceled") duration := time.Since(start) require.Truef(t, duration < time.Second, "connection attempt took %s", duration.String()) }) t.Run("connected with proper config", func(t *testing.T) { - db, err := OpenDB(ctx, dbCfg) + db, err := glsql.OpenDB(ctx, dbCfg) require.NoError(t, err, "opening of DB with correct configuration must not fail") require.NoError(t, db.Close()) }) } func TestUint64Provider(t *testing.T) { - var provider Uint64Provider + var provider glsql.Uint64Provider dst1 := provider.To() require.Equal(t, []interface{}{new(uint64)}, dst1, "must be a single value holder") @@ -76,21 +78,25 @@ func TestUint64Provider(t *testing.T) { func TestScanAll(t *testing.T) { t.Parallel() - db := NewDB(t) + db := testdb.New(t) - var ids Uint64Provider + var ids glsql.Uint64Provider notEmptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id)") require.NoError(t, err) + defer func() { require.NoError(t, notEmptyRows.Close()) }() - require.NoError(t, ScanAll(notEmptyRows, &ids)) + require.NoError(t, glsql.ScanAll(notEmptyRows, &ids)) require.Equal(t, []uint64{1, 200, 300500}, ids.Values()) + require.NoError(t, notEmptyRows.Err()) - var nothing Uint64Provider + var nothing glsql.Uint64Provider emptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id) WHERE id < 0") require.NoError(t, err) + defer func() { require.NoError(t, emptyRows.Close()) }() - require.NoError(t, ScanAll(emptyRows, ¬hing)) + require.NoError(t, glsql.ScanAll(emptyRows, ¬hing)) require.Equal(t, ([]uint64)(nil), nothing.Values()) + require.NoError(t, emptyRows.Err()) } func TestDSN(t *testing.T) { @@ -209,7 +215,7 @@ func TestDSN(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - require.Equal(t, tc.out, DSN(tc.in, tc.direct)) + require.Equal(t, tc.out, glsql.DSN(tc.in, tc.direct)) }) } } diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go deleted file mode 100644 index bced998d2..000000000 --- a/internal/praefect/datastore/glsql/testing.go +++ /dev/null @@ -1,381 +0,0 @@ -package glsql - -import ( - "context" - "database/sql" - "errors" - "net" - "os" - "os/exec" - "strconv" - "strings" - "sync" - "testing" - "time" - - "github.com/google/uuid" - migrate "github.com/rubenv/sql-migrate" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" -) - -const ( - advisoryLockIDDatabaseTemplate = 1627644550 - praefectTemplateDatabase = "praefect_template" -) - -// TxWrapper is a simple wrapper around *sql.Tx. -type TxWrapper struct { - *sql.Tx -} - -// Rollback executes Rollback operation on the wrapped *sql.Tx if it is set. -// After execution is sets Tx to nil to prevent errors on the repeated invocations (useful -// for testing when Rollback is deferred). -func (txw *TxWrapper) Rollback(t testing.TB) { - t.Helper() - if txw.Tx != nil { - require.NoError(t, txw.Tx.Rollback()) - txw.Tx = nil - } -} - -// Commit executes Commit operation on the wrapped *sql.Tx if it is set. -// After execution is sets Tx to nil to prevent errors on the deferred invocations (useful -// for testing when Rollback is deferred). -func (txw *TxWrapper) Commit(t testing.TB) { - t.Helper() - if txw.Tx != nil { - require.NoError(t, txw.Tx.Commit()) - txw.Tx = nil - } -} - -// DB is a helper struct that should be used only for testing purposes. -type DB struct { - *sql.DB - // Name is a name of the database. - Name string -} - -// Begin starts a new transaction and returns it wrapped into TxWrapper. -func (db DB) Begin(t testing.TB) *TxWrapper { - t.Helper() - tx, err := db.DB.Begin() - require.NoError(t, err) - return &TxWrapper{Tx: tx} -} - -// Truncate removes all data from the list of tables and restarts identities for them. -func (db DB) Truncate(t testing.TB, tables ...string) { - t.Helper() - - for _, table := range tables { - _, err := db.DB.Exec("DELETE FROM " + table) - require.NoError(t, err, "database cleanup failed: %s", tables) - } - - _, err := db.DB.Exec("SELECT setval(relname::TEXT, 1, false) from pg_class where relkind = 'S'") - require.NoError(t, err, "database cleanup failed: %s", tables) -} - -// RequireRowsInTable verifies that `tname` table has `n` amount of rows in it. -func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) { - t.Helper() - - var count int - require.NoError(t, db.QueryRow("SELECT COUNT(*) FROM "+tname).Scan(&count)) - require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n) -} - -// TruncateAll removes all data from known set of tables. -func (db DB) TruncateAll(t testing.TB) { - db.Truncate(t, - "replication_queue_job_lock", - "replication_queue", - "replication_queue_lock", - "node_status", - "shard_primaries", - "storage_repositories", - "repositories", - "virtual_storages", - "repository_assignments", - "storage_cleanups", - ) -} - -// MustExec executes `q` with `args` and verifies there are no errors. -func (db DB) MustExec(t testing.TB, q string, args ...interface{}) { - _, err := db.DB.Exec(q, args...) - require.NoError(t, err) -} - -// Close removes schema if it was used and releases connection pool. -func (db DB) Close() error { - if err := db.DB.Close(); err != nil { - return errors.New("failed to release connection pool: " + err.Error()) - } - return nil -} - -// NewDB returns a wrapper around the database connection pool. -// Must be used only for testing. -// The new database with empty relations will be created for each call of this function. -// It uses env vars: -// PGHOST - required, URL/socket/dir -// PGPORT - required, binding port -// PGUSER - optional, user - `$ whoami` would be used if not provided -// Once the test is completed the database will be dropped on test cleanup execution. -func NewDB(t testing.TB) DB { - t.Helper() - database := "praefect_" + strings.ReplaceAll(uuid.New().String(), "-", "") - return DB{DB: initPraefectTestDB(t, database), Name: database} -} - -// GetDBConfig returns the database configuration determined by -// environment variables. See NewDB() for the list of variables. -func GetDBConfig(t testing.TB, database string) config.DB { - env := getDatabaseEnvironment(t) - - require.Contains(t, env, "PGHOST", "PGHOST env var expected to be provided to connect to Postgres database") - require.Contains(t, env, "PGPORT", "PGHOST env var expected to be provided to connect to Postgres database") - - portNumber, err := strconv.Atoi(env["PGPORT"]) - require.NoError(t, err, "PGPORT must be a port number of the Postgres database listens for incoming connections") - - // connect to 'postgres' database first to re-create testing database from scratch - conf := config.DB{ - Host: env["PGHOST"], - Port: portNumber, - DBName: database, - SSLMode: "disable", - User: env["PGUSER"], - SessionPooled: config.DBConnection{ - Host: env["PGHOST"], - Port: portNumber, - }, - } - - if bouncerHost, ok := env["PGHOST_PGBOUNCER"]; ok { - conf.Host = bouncerHost - } - - if bouncerPort, ok := env["PGPORT_PGBOUNCER"]; ok { - bouncerPortNumber, err := strconv.Atoi(bouncerPort) - require.NoError(t, err, "PGPORT_PGBOUNCER must be a port number of the PgBouncer") - conf.Port = bouncerPortNumber - } - - return conf -} - -func requireSQLOpen(t testing.TB, dbCfg config.DB, direct bool) *sql.DB { - t.Helper() - db, err := sql.Open("postgres", DSN(dbCfg, direct)) - require.NoErrorf(t, err, "failed to connect to %q database", dbCfg.DBName) - if !assert.NoErrorf(t, db.Ping(), "failed to communicate with %q database", dbCfg.DBName) { - require.NoErrorf(t, db.Close(), "release connection to the %q database", dbCfg.DBName) - } - return db -} - -func requireTerminateAllConnections(t testing.TB, db *sql.DB, database string) { - t.Helper() - _, err := db.Exec("SELECT PG_TERMINATE_BACKEND(pid) FROM PG_STAT_ACTIVITY WHERE datname = '" + database + "'") - require.NoError(t, err) - - // Once the pg_terminate_backend has completed, we may need to wait before the connections - // are fully released. pg_terminate_backend will return true as long as the signal was - // sent successfully, but the backend needs to respond to the signal to close the connection. - // TODO: In Postgre 14, pg_terminate_backend takes an optional timeout argument that makes it a blocking - // call. https://gitlab.com/gitlab-org/gitaly/-/issues/3937 tracks the refactor work to remove this - // require.Eventuallyf call in favor of passing in a timeout to pg_terminate_backend - require.Eventuallyf(t, func() bool { - var openConnections int - require.NoError(t, db.QueryRow( - `SELECT COUNT(*) FROM pg_stat_activity - WHERE datname = $1 AND pid != pg_backend_pid()`, database). - Scan(&openConnections)) - return openConnections == 0 - }, 20*time.Second, 10*time.Millisecond, "wait for all connections to be terminated") -} - -func initPraefectTestDB(t testing.TB, database string) *sql.DB { - t.Helper() - - dbCfg := GetDBConfig(t, "postgres") - // We require a direct connection to the Postgres instance and not through the PgBouncer - // because we use transaction pool mood for it and it doesn't work well for system advisory locks. - postgresDB := requireSQLOpen(t, dbCfg, true) - defer func() { require.NoErrorf(t, postgresDB.Close(), "release connection to the %q database", dbCfg.DBName) }() - - // Acquire exclusive advisory lock to prevent other concurrent test from doing the same. - _, err := postgresDB.Exec(`SELECT pg_advisory_lock($1)`, advisoryLockIDDatabaseTemplate) - require.NoError(t, err, "not able to acquire lock for synchronisation") - var advisoryUnlock func() - advisoryUnlock = func() { - require.True(t, scanSingleBool(t, postgresDB, `SELECT pg_advisory_unlock($1)`, advisoryLockIDDatabaseTemplate), "release advisory lock") - advisoryUnlock = func() {} - } - defer func() { advisoryUnlock() }() - - templateDBExists := databaseExist(t, postgresDB, praefectTemplateDatabase) - if !templateDBExists { - _, err := postgresDB.Exec("CREATE DATABASE " + praefectTemplateDatabase + " WITH ENCODING 'UTF8'") - require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase) - } - - templateDBConf := GetDBConfig(t, praefectTemplateDatabase) - templateDB := requireSQLOpen(t, templateDBConf, true) - defer func() { - require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName) - }() - - if _, err := Migrate(templateDB, false); err != nil { - // If database has unknown migration we try to re-create template database with - // current migration. It may be caused by other code changes done in another branch. - if pErr := (*migrate.PlanError)(nil); errors.As(err, &pErr) { - if strings.EqualFold(pErr.ErrorMessage, "unknown migration in database") { - require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName) - - _, err = postgresDB.Exec("DROP DATABASE " + praefectTemplateDatabase) - require.NoErrorf(t, err, "failed to drop %q database", praefectTemplateDatabase) - _, err = postgresDB.Exec("CREATE DATABASE " + praefectTemplateDatabase + " WITH ENCODING 'UTF8'") - require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase) - - remigrateTemplateDB := requireSQLOpen(t, templateDBConf, true) - defer func() { - require.NoErrorf(t, remigrateTemplateDB.Close(), "release connection to the %q database", templateDBConf.DBName) - }() - _, err = Migrate(remigrateTemplateDB, false) - require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) - } else { - require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) - } - } else { - require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) - } - } - - // Release advisory lock as soon as possible to unblock other tests from execution. - advisoryUnlock() - - require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName) - - _, err = postgresDB.Exec(`CREATE DATABASE ` + database + ` TEMPLATE ` + praefectTemplateDatabase) - require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase) - - t.Cleanup(func() { - dbCfg.DBName = "postgres" - postgresDB := requireSQLOpen(t, dbCfg, true) - defer func() { require.NoErrorf(t, postgresDB.Close(), "release connection to the %q database", dbCfg.DBName) }() - - // We need to force-terminate open connections as for the tasks that use PgBouncer - // the actual client connected to the database is a PgBouncer and not a test that is - // running. - requireTerminateAllConnections(t, postgresDB, database) - - _, err = postgresDB.Exec("DROP DATABASE " + database) - require.NoErrorf(t, err, "failed to drop %q database", database) - }) - - // Connect to the testing database with optional PgBouncer - dbCfg.DBName = database - praefectTestDB := requireSQLOpen(t, dbCfg, false) - t.Cleanup(func() { - if err := praefectTestDB.Close(); !errors.Is(err, net.ErrClosed) { - require.NoErrorf(t, err, "release connection to the %q database", dbCfg.DBName) - } - }) - return praefectTestDB -} - -func databaseExist(t testing.TB, db *sql.DB, database string) bool { - return scanSingleBool(t, db, `SELECT EXISTS(SELECT * FROM pg_database WHERE datname = $1)`, database) -} - -func scanSingleBool(t testing.TB, db *sql.DB, query string, args ...interface{}) bool { - var flag bool - row := db.QueryRow(query, args...) - require.NoError(t, row.Scan(&flag)) - return flag -} - -var ( - // Running `gdk env` takes about 250ms on my system and is thus comparatively slow. When - // running with Praefect as proxy, this time adds up and may thus slow down tests by quite a - // margin. We thus amortize these costs by only running it once. - databaseEnvOnce sync.Once - databaseEnv map[string]string -) - -func getDatabaseEnvironment(t testing.TB) map[string]string { - databaseEnvOnce.Do(func() { - envvars := map[string]string{} - - // We only process output if `gdk env` returned success. If it didn't, we simply assume that - // we are not running in a GDK environment and will try to extract variables from the - // environment instead. - if output, err := exec.Command("gdk", "env").Output(); err == nil { - for _, line := range strings.Split(string(output), "\n") { - const prefix = "export " - if !strings.HasPrefix(line, prefix) { - continue - } - - split := strings.SplitN(strings.TrimPrefix(line, prefix), "=", 2) - if len(split) != 2 { - continue - } - - envvars[split[0]] = split[1] - } - } - - for _, key := range []string{"PGHOST", "PGPORT", "PGUSER", "PGHOST_PGBOUNCER", "PGPORT_PGBOUNCER"} { - if _, ok := envvars[key]; !ok { - value, ok := os.LookupEnv(key) - if ok { - envvars[key] = value - } - } - } - - databaseEnv = envvars - }) - - return databaseEnv -} - -// WaitForBlockedQuery is a helper that waits until a blocked query matching the prefix is present in the -// database. This is useful for ensuring another transaction is blocking a query when testing concurrent -// execution of multiple queries. -func WaitForBlockedQuery(ctx context.Context, t testing.TB, db Querier, queryPrefix string) { - t.Helper() - - for { - var queryBlocked bool - require.NoError(t, db.QueryRowContext(ctx, ` - SELECT EXISTS ( - SELECT FROM pg_stat_activity - WHERE TRIM(e'\n' FROM query) LIKE $1 - AND state = 'active' - AND wait_event_type = 'Lock' - AND datname = current_database() - ) - `, queryPrefix+"%").Scan(&queryBlocked)) - - if queryBlocked { - return - } - - retry := time.NewTimer(time.Millisecond) - select { - case <-ctx.Done(): - retry.Stop() - return - case <-retry.C: - } - } -} diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go index 96bed5f9f..26013ebd6 100644 --- a/internal/praefect/datastore/listener_postgres_test.go +++ b/internal/praefect/datastore/listener_postgres_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func TestNewPostgresListener(t *testing.T) { @@ -85,13 +86,13 @@ func (mlh mockListenHandler) Connected() { func TestPostgresListener_Listen(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) logger := testhelper.NewDiscardingLogger(t) newOpts := func() PostgresListenerOpts { opts := DefaultPostgresListenerOpts - opts.Addr = glsql.DSN(glsql.GetDBConfig(t, db.Name), true) + opts.Addr = glsql.DSN(testdb.GetConfig(t, db.Name), true) opts.MinReconnectInterval = time.Nanosecond opts.MaxReconnectInterval = time.Minute return opts @@ -367,7 +368,7 @@ func requireEqualNotificationEntries(t *testing.T, d string, entries []notificat func TestPostgresListener_Listen_repositories_delete(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) const channel = "repositories_updates" @@ -400,7 +401,7 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) { func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) const channel = "storage_repositories_updates" @@ -432,7 +433,7 @@ func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) { func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) const channel = "storage_repositories_updates" @@ -460,7 +461,7 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) { func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) const channel = "storage_repositories_updates" @@ -479,7 +480,7 @@ func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) { func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) const channel = "storage_repositories_updates" @@ -523,7 +524,7 @@ func testListener(t *testing.T, dbName, channel string, setup func(t *testing.T) } opts := DefaultPostgresListenerOpts - opts.Addr = glsql.DSN(glsql.GetDBConfig(t, dbName), true) + opts.Addr = glsql.DSN(testdb.GetConfig(t, dbName), true) opts.Channels = []string{channel} handler := mockListenHandler{OnNotification: callback, OnConnected: func() { close(readyChan) }} diff --git a/internal/praefect/datastore/postgres_test.go b/internal/praefect/datastore/postgres_test.go index 9ae3a78be..f1a8f7237 100644 --- a/internal/praefect/datastore/postgres_test.go +++ b/internal/praefect/datastore/postgres_test.go @@ -5,15 +5,15 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func TestMigrateStatus(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) config := config.Config{ - DB: glsql.GetDBConfig(t, db.Name), + DB: testdb.GetConfig(t, db.Name), } _, err := db.Exec("INSERT INTO schema_migrations VALUES ('2020_01_01_test', NOW())") diff --git a/internal/praefect/datastore/queue_bm_test.go b/internal/praefect/datastore/queue_bm_test.go index 8fea25258..fd44b8167 100644 --- a/internal/praefect/datastore/queue_bm_test.go +++ b/internal/praefect/datastore/queue_bm_test.go @@ -4,8 +4,8 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func BenchmarkPostgresReplicationEventQueue_Acknowledge(b *testing.B) { @@ -31,7 +31,7 @@ func BenchmarkPostgresReplicationEventQueue_Acknowledge(b *testing.B) { } func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[JobState]int) { - db := glsql.NewDB(b) + db := testdb.New(b) ctx, cancel := testhelper.Context() defer cancel() diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index aeeff4475..64a73c71f 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -8,13 +8,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string existingJob *ReplicationEvent @@ -142,7 +142,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) { func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) ctx, cancel := testhelper.Context() defer cancel() @@ -189,7 +189,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.T) { t.Parallel() - queue := NewPostgresReplicationEventQueue(glsql.NewDB(t)) + queue := NewPostgresReplicationEventQueue(testdb.New(t)) ctx, cancel := testhelper.Context() defer cancel() @@ -240,7 +240,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing. func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) ctx, cancel := testhelper.Context() defer cancel() @@ -374,7 +374,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) ctx, cancel := testhelper.Context() defer cancel() @@ -423,7 +423,7 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) { // expected results are listed as literals on purpose to be more explicit about what is going on with data func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) ctx, cancel := testhelper.Context() defer cancel() @@ -533,7 +533,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) ctx, cancel := testhelper.Context() defer cancel() @@ -596,7 +596,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) ctx, cancel := testhelper.Context() defer cancel() @@ -640,7 +640,7 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) { func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) ctx, cancel := testhelper.Context() defer cancel() @@ -829,7 +829,7 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { eventType4 := eventType1 eventType4.Job.TargetNodeStorage = "s-2" - db := glsql.NewDB(t) + db := testdb.New(t) t.Run("no events is valid", func(t *testing.T) { // 'qc' is not initialized, so the test will fail if there will be an attempt to make SQL operation @@ -976,7 +976,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { eventType4 := eventType3 eventType4.Job.TargetNodeStorage = "gitaly-3" - db := glsql.NewDB(t) + db := testdb.New(t) t.Run("no stale jobs yet", func(t *testing.T) { db.TruncateAll(t) @@ -1065,7 +1065,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { }) } -func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) { +func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []ReplicationEvent) { t.Helper() // as it is not possible to expect exact time of entity creation/update we do not fetch it from database @@ -1093,7 +1093,7 @@ type LockRow struct { Acquired bool } -func requireLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []LockRow) { +func requireLocks(t *testing.T, ctx context.Context, db testdb.DB, expected []LockRow) { t.Helper() sqlStmt := `SELECT id, acquired FROM replication_queue_lock` @@ -1118,7 +1118,7 @@ type JobLockRow struct { TriggeredAt time.Time } -func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []JobLockRow) { +func requireJobLocks(t *testing.T, ctx context.Context, db testdb.DB, expected []JobLockRow) { t.Helper() actual := fetchJobLocks(t, ctx, db) @@ -1128,7 +1128,7 @@ func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected [] require.ElementsMatch(t, expected, actual) } -func fetchJobLocks(t *testing.T, ctx context.Context, db glsql.DB) []JobLockRow { +func fetchJobLocks(t *testing.T, ctx context.Context, db testdb.DB) []JobLockRow { t.Helper() sqlStmt := `SELECT job_id, lock_id, triggered_at FROM replication_queue_job_lock ORDER BY job_id` rows, err := db.QueryContext(ctx, sqlStmt) diff --git a/internal/praefect/datastore/repository_store_bm_test.go b/internal/praefect/datastore/repository_store_bm_test.go index 8555c971b..b0ffbbfad 100644 --- a/internal/praefect/datastore/repository_store_bm_test.go +++ b/internal/praefect/datastore/repository_store_bm_test.go @@ -5,8 +5,8 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) // The test setup takes a lot of time, so it is better to run each sub-benchmark separately with limit on number of repeats. @@ -38,7 +38,7 @@ func BenchmarkPostgresRepositoryStore_GetConsistentStorages(b *testing.B) { } func benchmarkGetConsistentStorages(b *testing.B, nstorages, nrepositories int) { - db := glsql.NewDB(b) + db := testdb.New(b) ctx, cancel := testhelper.Context() defer cancel() diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 291e56a26..7692a4779 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -116,7 +116,7 @@ FROM storage_repositories } func TestRepositoryStore_Postgres(t *testing.T) { - db := glsql.NewDB(t) + db := testdb.New(t) testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireStateFunc) { db.TruncateAll(t) gs := NewPostgresRepositoryStore(db, storages) @@ -129,7 +129,7 @@ func TestRepositoryStore_Postgres(t *testing.T) { } func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { - db := glsql.NewDB(t) + db := testdb.New(t) type call struct { primary string @@ -214,7 +214,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { require.NoError(t, err) go func() { - glsql.WaitForBlockedQuery(ctx, t, db, "WITH updated_replicas AS (") + testdb.WaitForBlockedQuery(ctx, t, db, "WITH updated_replicas AS (") firstTx.Commit(t) }() @@ -1191,7 +1191,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { func TestPostgresRepositoryStore_GetRepositoryMetadata(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string nonExistentRepository bool diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go index 419115119..77ee0dd22 100644 --- a/internal/praefect/datastore/storage_cleanup_test.go +++ b/internal/praefect/datastore/storage_cleanup_test.go @@ -6,15 +6,15 @@ import ( "time" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func TestStorageCleanup_Populate(t *testing.T) { t.Parallel() ctx, cancel := testhelper.Context() defer cancel() - db := glsql.NewDB(t) + db := testdb.New(t) storageCleanup := NewStorageCleanup(db.DB) require.NoError(t, storageCleanup.Populate(ctx, "praefect", "gitaly-1")) @@ -37,7 +37,7 @@ func TestStorageCleanup_AcquireNextStorage(t *testing.T) { t.Parallel() ctx, cancel := testhelper.Context() defer cancel() - db := glsql.NewDB(t) + db := testdb.New(t) storageCleanup := NewStorageCleanup(db.DB) t.Run("ok", func(t *testing.T) { @@ -179,7 +179,7 @@ func TestStorageCleanup_Exists(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db := glsql.NewDB(t) + db := testdb.New(t) repoStore := NewPostgresRepositoryStore(db.DB, nil) require.NoError(t, repoStore.CreateRepository(ctx, 0, "vs", "p/1", "replica-path-1", "g1", []string{"g2", "g3"}, nil, false, false)) @@ -250,7 +250,7 @@ type storageCleanupRow struct { TriggeredAt sql.NullTime } -func getAllStoragesCleanup(t testing.TB, db glsql.DB) []storageCleanupRow { +func getAllStoragesCleanup(t testing.TB, db testdb.DB) []storageCleanupRow { rows, err := db.Query(`SELECT * FROM storage_cleanups`) require.NoError(t, err) defer func() { diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index c3c6f2e98..8c0fd2fd9 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -16,12 +16,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) rs := NewPostgresRepositoryStore(db, nil) t.Run("unknown virtual storage", func(t *testing.T) { diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 8b267a8e0..d9eed5a81 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -15,7 +15,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/log" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" @@ -23,6 +22,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" correlation "gitlab.com/gitlab-org/labkit/correlation/grpc" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -119,7 +119,7 @@ func withMockBackends(t testing.TB, backends map[string]mock.SimpleServiceServer } func defaultQueue(t testing.TB) datastore.ReplicationEventQueue { - return datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) + return datastore.NewPostgresReplicationEventQueue(testdb.New(t)) } func defaultTxMgr(conf config.Config) *transactions.Manager { diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index 64129875b..8e270b87b 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -12,7 +12,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -65,7 +64,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - tx := glsql.NewDB(t).Begin(t) + tx := testdb.New(t).Begin(t) defer tx.Rollback(t) testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{ diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go index 9645d7a9a..9dd4e3da8 100644 --- a/internal/praefect/nodes/health_manager_test.go +++ b/internal/praefect/nodes/health_manager_test.go @@ -10,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) @@ -60,7 +61,7 @@ func TestHealthManager(t *testing.T) { HealthConsensus map[string][]string } - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string @@ -558,7 +559,7 @@ func TestHealthManager_databaseTimeout(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db := glsql.NewDB(t) + db := testdb.New(t) blockingTx := db.Begin(t) defer blockingTx.Rollback(t) @@ -596,14 +597,14 @@ func TestHealthManager_databaseTimeout(t *testing.T) { }() // Wait until the blocked query is waiting. - glsql.WaitForBlockedQuery(ctx, t, db, "INSERT INTO node_status") + testdb.WaitForBlockedQuery(ctx, t, db, "INSERT INTO node_status") // Simulate a timeout. timeoutQuery() // Query should have been canceled. require.EqualError(t, <-blockedErr, "update checks: pq: canceling statement due to user request") } -func predateHealthChecks(t testing.TB, db glsql.DB, amount time.Duration) { +func predateHealthChecks(t testing.TB, db testdb.DB, amount time.Duration) { t.Helper() _, err := db.Exec(` @@ -618,7 +619,7 @@ func predateHealthChecks(t testing.TB, db glsql.DB, amount time.Duration) { // This test case ensures the record updates are done in an ordered manner to avoid concurrent writes // deadlocking. Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3907 func TestHealthManager_orderedWrites(t *testing.T) { - db := glsql.NewDB(t) + db := testdb.New(t) tx1 := db.Begin(t).Tx defer func() { _ = tx1.Rollback() }() @@ -648,7 +649,7 @@ func TestHealthManager_orderedWrites(t *testing.T) { }() // Wait for tx2 to be blocked on the gitaly-1 lock acquired by tx1 - glsql.WaitForBlockedQuery(ctx, t, db, "INSERT INTO node_status") + testdb.WaitForBlockedQuery(ctx, t, db, "INSERT INTO node_status") // Ensure tx1 can acquire lock on gitaly-2. require.NoError(t, hm1.updateHealthChecks(ctx, []string{virtualStorage}, []string{"gitaly-2"}, []bool{true})) diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index 8d872a5dd..74a9915b8 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -9,7 +9,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) @@ -47,7 +46,7 @@ func TestPerRepositoryElector(t *testing.T) { primary matcher } - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string @@ -523,7 +522,7 @@ func TestPerRepositoryElector(t *testing.T) { const repositoryID int64 = 1 for _, step := range tc.steps { - runElection := func(tx *glsql.TxWrapper) (string, *logrus.Entry) { + runElection := func(tx *testdb.TxWrapper) (string, *logrus.Entry) { testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect-0": step.healthyNodes}) logger, hook := test.NewNullLogger() diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go index a483d4271..9050d7287 100644 --- a/internal/praefect/nodes/sql_elector_test.go +++ b/internal/praefect/nodes/sql_elector_test.go @@ -16,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -29,7 +30,7 @@ var shardName = "test-shard-0" func TestGetPrimaryAndSecondaries(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) logger := testhelper.NewDiscardingLogger(t).WithField("test", t.Name()) praefectSocket := testhelper.GetTemporaryGitalySocketFileName(t) @@ -74,7 +75,7 @@ func TestGetPrimaryAndSecondaries(t *testing.T) { func TestSqlElector_slow_execution(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) praefectSocket := "unix://" + testhelper.GetTemporaryGitalySocketFileName(t) logger := testhelper.NewDiscardingLogger(t).WithField("test", t.Name()) @@ -113,7 +114,7 @@ func TestSqlElector_slow_execution(t *testing.T) { func TestBasicFailover(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) logger := testhelper.NewDiscardingLogger(t).WithField("test", t.Name()) praefectSocket := testhelper.GetTemporaryGitalySocketFileName(t) @@ -224,7 +225,7 @@ func TestBasicFailover(t *testing.T) { func TestElectDemotedPrimary(t *testing.T) { t.Parallel() - tx := glsql.NewDB(t).Begin(t) + tx := testdb.New(t).Begin(t) defer tx.Rollback(t) node := config.Node{Storage: "gitaly-0"} @@ -263,7 +264,7 @@ func TestElectDemotedPrimary(t *testing.T) { // predateLastSeenActiveAt shifts the last_seen_active_at column to an earlier time. This avoids // waiting for the node's status to become unhealthy. -func predateLastSeenActiveAt(t testing.TB, db glsql.DB, shardName, nodeName string, amount time.Duration) { +func predateLastSeenActiveAt(t testing.TB, db testdb.DB, shardName, nodeName string, amount time.Duration) { t.Helper() _, err := db.Exec(` @@ -290,7 +291,7 @@ func predateElection(t testing.TB, ctx context.Context, db glsql.Querier, shardN func TestElectNewPrimary(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) ns := []*nodeStatus{{ node: config.Node{ @@ -462,7 +463,7 @@ func TestConnectionMultiplexing(t *testing.T) { go srv.Serve(ln) - db := glsql.NewDB(t) + db := testdb.New(t) mgr, err := NewManager( testhelper.NewDiscardingLogEntry(t), config.Config{ diff --git a/internal/praefect/reconciler/reconciler_benchmark_test.go b/internal/praefect/reconciler/reconciler_benchmark_test.go index cbcbebfe3..4c11630b8 100644 --- a/internal/praefect/reconciler/reconciler_benchmark_test.go +++ b/internal/praefect/reconciler/reconciler_benchmark_test.go @@ -7,8 +7,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func BenchmarkReconcile(b *testing.B) { @@ -31,7 +31,7 @@ func benchmarkReconcile(b *testing.B, numRepositories int, worstCase bool) { ctx, cancel := testhelper.Context() defer cancel() - db := glsql.NewDB(b) + db := testdb.New(b) behind := 0 if worstCase { diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index 0fea162ad..54b9e7b93 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -15,8 +15,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) func TestReconciler(t *testing.T) { @@ -82,7 +82,7 @@ func TestReconciler(t *testing.T) { return out } - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string @@ -1176,7 +1176,7 @@ func TestReconciler_renames(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string diff --git a/internal/praefect/remove_repository_test.go b/internal/praefect/remove_repository_test.go index 0aec0c1f8..d2a0e9629 100644 --- a/internal/praefect/remove_repository_test.go +++ b/internal/praefect/remove_repository_test.go @@ -13,11 +13,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" @@ -41,7 +41,7 @@ func testRemoveRepositoryHandler(t *testing.T, ctx context.Context) { errNotFound = helper.ErrNotFoundf("repository does not exist") } - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string routeToGitaly bool diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go index c8cea0716..025597a0b 100644 --- a/internal/praefect/replicator_pg_test.go +++ b/internal/praefect/replicator_pg_test.go @@ -10,8 +10,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -49,7 +49,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) { require.NoError(t, err) defer testhelper.MustClose(t, targetCC) - rs := datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil) + rs := datastore.NewPostgresRepositoryStore(testdb.New(t), nil) require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "relative-path-1", "gitaly-1", nil, nil, true, false)) @@ -76,7 +76,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) { func TestReplicatorDestroy(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { change datastore.ChangeType error error diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 2dbf02045..3d13b2c3c 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -28,13 +28,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" @@ -150,7 +150,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { logger := testhelper.NewDiscardingLogger(t) loggerHook := test.NewLocal(logger) - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { cancel() // when it is called we know that replication is finished return queue.Acknowledge(ctx, state, ids) @@ -160,7 +160,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { _, err = queue.Enqueue(ctx, events[0]) require.NoError(t, err) - db := glsql.NewDB(t) + db := testdb.New(t) rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false)) @@ -311,7 +311,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty // By using WaitGroup we are sure the test cleanup will be started after all replication // requests are completed, so no running cache IO operations happen. - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) var wg sync.WaitGroup queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { wg.Add(1) @@ -688,7 +688,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) // this job exists to verify that replication works okJob := datastore.ReplicationJob{ @@ -717,7 +717,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - db := glsql.NewDB(t) + db := testdb.New(t) rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false)) @@ -795,7 +795,7 @@ func TestProcessBacklog_Success(t *testing.T) { }, } - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { ackIDs, err := queue.Acknowledge(ctx, state, ids) if len(ids) > 0 { @@ -874,7 +874,7 @@ func TestProcessBacklog_Success(t *testing.T) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - db := glsql.NewDB(t) + db := testdb.New(t) rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false)) @@ -921,7 +921,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { var mtx sync.Mutex expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true} - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) { select { case <-ctx.Done(): @@ -1004,7 +1004,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }, } - queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) + queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RepositoryID: 1, @@ -1017,7 +1017,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }) require.NoError(t, err) - db := glsql.NewDB(t) + db := testdb.New(t) rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false)) diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go index 029be5d3a..7acf5f142 100644 --- a/internal/praefect/repocleaner/repository_test.go +++ b/internal/praefect/repocleaner/repository_test.go @@ -17,11 +17,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" ) @@ -48,8 +48,8 @@ func TestRunner_Run(t *testing.T) { g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) g3Addr := testserver.RunGitalyServer(t, g3Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) - db := glsql.NewDB(t) - dbConf := glsql.GetDBConfig(t, db.Name) + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) conf := config.Config{ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), @@ -197,8 +197,8 @@ func TestRunner_Run_noAvailableStorages(t *testing.T) { g1Cfg := testcfg.Build(t, testcfg.WithStorages(storage1)) g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) - db := glsql.NewDB(t) - dbConf := glsql.GetDBConfig(t, db.Name) + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) conf := config.Config{ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go index 2d7d2fe84..af688dc32 100644 --- a/internal/praefect/repository_exists_test.go +++ b/internal/praefect/repository_exists_test.go @@ -9,10 +9,10 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -23,7 +23,7 @@ func TestRepositoryExistsHandler(t *testing.T) { t.Parallel() errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly") - db := glsql.NewDB(t) + db := testdb.New(t) for _, tc := range []struct { desc string routeToGitaly bool diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 8372422d4..ca2079639 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -9,7 +9,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" @@ -116,7 +115,7 @@ func TestPerRepositoryRouter_RouteStorageAccessor(t *testing.T) { func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) const relativePath = "repository" @@ -269,7 +268,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { t.Parallel() - db := glsql.NewDB(t) + db := testdb.New(t) configuredNodes := map[string][]string{ "virtual-storage-1": {"primary", "secondary-1", "secondary-2"}, @@ -482,7 +481,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { secondary1Conn := &grpc.ClientConn{} secondary2Conn := &grpc.ClientConn{} - db := glsql.NewDB(t) + db := testdb.New(t) const relativePath = "relative-path" diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index 241ce6f86..d3563f1ef 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -23,7 +23,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/listenmux" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction" @@ -32,6 +31,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" @@ -71,7 +71,7 @@ func TestServerFactory(t *testing.T) { revision := text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "HEAD")) logger := testhelper.NewDiscardingLogEntry(t) - queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) + queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) rs := datastore.MockRepositoryStore{} txMgr := transactions.NewManager(conf) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c54714143..61d718f03 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -31,7 +31,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" @@ -536,7 +535,7 @@ func testRemoveRepository(t *testing.T, ctx context.Context) { verifyReposExistence(t, codes.OK) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) repoStore := defaultRepoStore(praefectCfg) txMgr := defaultTxMgr(praefectCfg) nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), praefectCfg, nil, @@ -633,9 +632,9 @@ func TestRenameRepository(t *testing.T) { repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath) } - evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) + evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) - tx := glsql.NewDB(t).Begin(t) + tx := testdb.New(t).Begin(t) defer tx.Rollback(t) rs := datastore.NewPostgresRepositoryStore(tx, nil) @@ -834,7 +833,7 @@ func TestProxyWrites(t *testing.T) { }, } - queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) + queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) entry := testhelper.NewDiscardingLogEntry(t) nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) @@ -970,7 +969,7 @@ func TestErrorThreshold(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) + queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) entry := testhelper.NewDiscardingLogEntry(t) testCases := []struct { diff --git a/internal/testhelper/testdb/db.go b/internal/testhelper/testdb/db.go index 73c5d39f8..529ebb4a0 100644 --- a/internal/testhelper/testdb/db.go +++ b/internal/testhelper/testdb/db.go @@ -2,50 +2,402 @@ package testdb import ( "context" + "database/sql" + "errors" + "net" + "os" + "os/exec" + "strconv" + "strings" + "sync" "testing" + "time" - "github.com/lib/pq" + "github.com/google/uuid" + migrate "github.com/rubenv/sql-migrate" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" ) -// SetHealthyNodes sets the healthy nodes in the database as determined by the passed in map. The healthyNodes map is keyed by -// praefect name -> virtual storage -> storage. On each run, it clears all previous health checks from the table, so the -// passed in nodes are the only ones considered healthy after the function. As the healthy nodes are determined by the time of -// the last successful health check, this should be run in the same transastion as the tested query to prevent flakiness. -// -//nolint:revive -func SetHealthyNodes(t testing.TB, ctx context.Context, db glsql.Querier, healthyNodes map[string]map[string][]string) { - t.Helper() - - var praefects, virtualStorages, storages []string - for praefect, virtualStors := range healthyNodes { - for virtualStorage, stors := range virtualStors { - for _, storage := range stors { - praefects = append(praefects, praefect) - virtualStorages = append(virtualStorages, virtualStorage) - storages = append(storages, storage) - } - } +const ( + advisoryLockIDDatabaseTemplate = 1627644550 + praefectTemplateDatabase = "praefect_template" +) + +// TxWrapper is a simple wrapper around *sql.Tx. +type TxWrapper struct { + *sql.Tx +} + +// Rollback executes Rollback operation on the wrapped *sql.Tx if it is set. +// After execution is sets Tx to nil to prevent errors on the repeated invocations (useful +// for testing when Rollback is deferred). +func (txw *TxWrapper) Rollback(t testing.TB) { + t.Helper() + if txw.Tx != nil { + require.NoError(t, txw.Tx.Rollback()) + txw.Tx = nil + } +} + +// Commit executes Commit operation on the wrapped *sql.Tx if it is set. +// After execution is sets Tx to nil to prevent errors on the deferred invocations (useful +// for testing when Rollback is deferred). +func (txw *TxWrapper) Commit(t testing.TB) { + t.Helper() + if txw.Tx != nil { + require.NoError(t, txw.Tx.Commit()) + txw.Tx = nil + } +} + +// DB is a helper struct that should be used only for testing purposes. +type DB struct { + *sql.DB + // Name is a name of the database. + Name string +} + +// Begin starts a new transaction and returns it wrapped into TxWrapper. +func (db DB) Begin(t testing.TB) *TxWrapper { + t.Helper() + tx, err := db.DB.Begin() + require.NoError(t, err) + return &TxWrapper{Tx: tx} +} + +// Truncate removes all data from the list of tables and restarts identities for them. +func (db DB) Truncate(t testing.TB, tables ...string) { + t.Helper() + + for _, table := range tables { + _, err := db.DB.Exec("DELETE FROM " + table) + require.NoError(t, err, "database cleanup failed: %s", tables) } - _, err := db.ExecContext(ctx, ` -WITH clear_previous_checks AS ( DELETE FROM node_status ) - -INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) -SELECT - unnest($1::text[]) AS praefect_name, - unnest($2::text[]) AS shard_name, - unnest($3::text[]) AS node_name, - NOW() AS last_contact_attempt_at, - NOW() AS last_seen_active_at -ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET - last_contact_attempt_at = NOW(), - last_seen_active_at = NOW() - `, - pq.StringArray(praefects), - pq.StringArray(virtualStorages), - pq.StringArray(storages), + _, err := db.DB.Exec("SELECT setval(relname::TEXT, 1, false) from pg_class where relkind = 'S'") + require.NoError(t, err, "database cleanup failed: %s", tables) +} + +// RequireRowsInTable verifies that `tname` table has `n` amount of rows in it. +func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) { + t.Helper() + + var count int + require.NoError(t, db.QueryRow("SELECT COUNT(*) FROM "+tname).Scan(&count)) + require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n) +} + +// TruncateAll removes all data from known set of tables. +func (db DB) TruncateAll(t testing.TB) { + db.Truncate(t, + "replication_queue_job_lock", + "replication_queue", + "replication_queue_lock", + "node_status", + "shard_primaries", + "storage_repositories", + "repositories", + "virtual_storages", + "repository_assignments", + "storage_cleanups", ) +} + +// MustExec executes `q` with `args` and verifies there are no errors. +func (db DB) MustExec(t testing.TB, q string, args ...interface{}) { + _, err := db.DB.Exec(q, args...) + require.NoError(t, err) +} + +// Close removes schema if it was used and releases connection pool. +func (db DB) Close() error { + if err := db.DB.Close(); err != nil { + return errors.New("failed to release connection pool: " + err.Error()) + } + return nil +} + +// New returns a wrapper around the database connection pool. +// Must be used only for testing. +// The new database with empty relations will be created for each call of this function. +// It uses env vars: +// PGHOST - required, URL/socket/dir +// PGPORT - required, binding port +// PGUSER - optional, user - `$ whoami` would be used if not provided +// Once the test is completed the database will be dropped on test cleanup execution. +func New(t testing.TB) DB { + t.Helper() + database := "praefect_" + strings.ReplaceAll(uuid.New().String(), "-", "") + return DB{DB: initPraefectDB(t, database), Name: database} +} + +// GetConfig returns the database configuration determined by +// environment variables. See NewDB() for the list of variables. +func GetConfig(t testing.TB, database string) config.DB { + env := getDatabaseEnvironment(t) + + require.Contains(t, env, "PGHOST", "PGHOST env var expected to be provided to connect to Postgres database") + require.Contains(t, env, "PGPORT", "PGHOST env var expected to be provided to connect to Postgres database") + + portNumber, err := strconv.Atoi(env["PGPORT"]) + require.NoError(t, err, "PGPORT must be a port number of the Postgres database listens for incoming connections") + + // connect to 'postgres' database first to re-create testing database from scratch + conf := config.DB{ + Host: env["PGHOST"], + Port: portNumber, + DBName: database, + SSLMode: "disable", + User: env["PGUSER"], + SessionPooled: config.DBConnection{ + Host: env["PGHOST"], + Port: portNumber, + }, + } + + if bouncerHost, ok := env["PGHOST_PGBOUNCER"]; ok { + conf.Host = bouncerHost + } + + if bouncerPort, ok := env["PGPORT_PGBOUNCER"]; ok { + bouncerPortNumber, err := strconv.Atoi(bouncerPort) + require.NoError(t, err, "PGPORT_PGBOUNCER must be a port number of the PgBouncer") + conf.Port = bouncerPortNumber + } + + return conf +} + +func requireSQLOpen(t testing.TB, dbCfg config.DB, direct bool) *sql.DB { + t.Helper() + db, err := sql.Open("postgres", glsql.DSN(dbCfg, direct)) + require.NoErrorf(t, err, "failed to connect to %q database", dbCfg.DBName) + if !assert.NoErrorf(t, db.Ping(), "failed to communicate with %q database", dbCfg.DBName) { + require.NoErrorf(t, db.Close(), "release connection to the %q database", dbCfg.DBName) + } + return db +} + +func requireTerminateAllConnections(t testing.TB, db *sql.DB, database string) { + t.Helper() + _, err := db.Exec("SELECT PG_TERMINATE_BACKEND(pid) FROM PG_STAT_ACTIVITY WHERE datname = '" + database + "'") require.NoError(t, err) + + // Once the pg_terminate_backend has completed, we may need to wait before the connections + // are fully released. pg_terminate_backend will return true as long as the signal was + // sent successfully, but the backend needs to respond to the signal to close the connection. + // TODO: In Postgre 14, pg_terminate_backend takes an optional timeout argument that makes it a blocking + // call. https://gitlab.com/gitlab-org/gitaly/-/issues/3937 tracks the refactor work to remove this + // require.Eventuallyf call in favor of passing in a timeout to pg_terminate_backend + require.Eventuallyf(t, func() bool { + var openConnections int + require.NoError(t, db.QueryRow( + `SELECT COUNT(*) FROM pg_stat_activity + WHERE datname = $1 AND pid != pg_backend_pid()`, database). + Scan(&openConnections)) + return openConnections == 0 + }, 20*time.Second, 10*time.Millisecond, "wait for all connections to be terminated") +} + +func initPraefectDB(t testing.TB, database string) *sql.DB { + t.Helper() + + dbCfg := GetConfig(t, "postgres") + // We require a direct connection to the Postgres instance and not through the PgBouncer + // because we use transaction pool mood for it and it doesn't work well for system advisory locks. + postgresDB := requireSQLOpen(t, dbCfg, true) + defer func() { require.NoErrorf(t, postgresDB.Close(), "release connection to the %q database", dbCfg.DBName) }() + + // Acquire exclusive advisory lock to prevent other concurrent test from doing the same. + _, err := postgresDB.Exec(`SELECT pg_advisory_lock($1)`, advisoryLockIDDatabaseTemplate) + require.NoError(t, err, "not able to acquire lock for synchronisation") + var advisoryUnlock func() + advisoryUnlock = func() { + require.True(t, scanSingleBool(t, postgresDB, `SELECT pg_advisory_unlock($1)`, advisoryLockIDDatabaseTemplate), "release advisory lock") + advisoryUnlock = func() {} + } + defer func() { advisoryUnlock() }() + + templateDBExists := databaseExist(t, postgresDB, praefectTemplateDatabase) + if !templateDBExists { + _, err := postgresDB.Exec("CREATE DATABASE " + praefectTemplateDatabase + " WITH ENCODING 'UTF8'") + require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase) + } + + templateDBConf := GetConfig(t, praefectTemplateDatabase) + templateDB := requireSQLOpen(t, templateDBConf, true) + defer func() { + require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName) + }() + + if _, err := glsql.Migrate(templateDB, false); err != nil { + // If database has unknown migration we try to re-create template database with + // current migration. It may be caused by other code changes done in another branch. + if pErr := (*migrate.PlanError)(nil); errors.As(err, &pErr) { + if strings.EqualFold(pErr.ErrorMessage, "unknown migration in database") { + require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName) + + _, err = postgresDB.Exec("DROP DATABASE " + praefectTemplateDatabase) + require.NoErrorf(t, err, "failed to drop %q database", praefectTemplateDatabase) + _, err = postgresDB.Exec("CREATE DATABASE " + praefectTemplateDatabase + " WITH ENCODING 'UTF8'") + require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase) + + remigrateTemplateDB := requireSQLOpen(t, templateDBConf, true) + defer func() { + require.NoErrorf(t, remigrateTemplateDB.Close(), "release connection to the %q database", templateDBConf.DBName) + }() + _, err = glsql.Migrate(remigrateTemplateDB, false) + require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) + } else { + require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) + } + } else { + require.NoErrorf(t, err, "failed to run database migration on %q", praefectTemplateDatabase) + } + } + + // Release advisory lock as soon as possible to unblock other tests from execution. + advisoryUnlock() + + require.NoErrorf(t, templateDB.Close(), "release connection to the %q database", templateDBConf.DBName) + + _, err = postgresDB.Exec(`CREATE DATABASE ` + database + ` TEMPLATE ` + praefectTemplateDatabase) + require.NoErrorf(t, err, "failed to create %q database", praefectTemplateDatabase) + + t.Cleanup(func() { + if _, ok := getDatabaseEnvironment(t)["PGHOST_PGBOUNCER"]; ok { + pgbouncerCfg := dbCfg + // This database name will connect us to the special admin console. + pgbouncerCfg.DBName = "pgbouncer" + + // We cannot use `requireSQLOpen()` because it would ping the database, + // which is not supported by the PgBouncer admin console. + pgbouncerDB, err := sql.Open("postgres", glsql.DSN(pgbouncerCfg, false)) + require.NoError(t, err) + defer testhelper.MustClose(t, pgbouncerDB) + + // Trying to release connections like we do with the "normal" Postgres + // database regularly results in flaky tests with PgBouncer given that the + // connections are seemingly never released. Instead, we kill PgBouncer + // connections by connecting to its admin console and using the KILL + // command, which instructs it to kill all client and server connections. + _, err = pgbouncerDB.Exec("KILL " + database) + require.NoError(t, err, "killing PgBouncer connections") + } + + dbCfg.DBName = "postgres" + postgresDB := requireSQLOpen(t, dbCfg, true) + defer testhelper.MustClose(t, postgresDB) + + // We need to force-terminate open connections as for the tasks that use PgBouncer + // the actual client connected to the database is a PgBouncer and not a test that is + // running. + requireTerminateAllConnections(t, postgresDB, database) + + _, err = postgresDB.Exec("DROP DATABASE " + database) + require.NoErrorf(t, err, "failed to drop %q database", database) + }) + + // Connect to the testing database with optional PgBouncer + dbCfg.DBName = database + praefectTestDB := requireSQLOpen(t, dbCfg, false) + t.Cleanup(func() { + if err := praefectTestDB.Close(); !errors.Is(err, net.ErrClosed) { + require.NoErrorf(t, err, "release connection to the %q database", dbCfg.DBName) + } + }) + return praefectTestDB +} + +func databaseExist(t testing.TB, db *sql.DB, database string) bool { + return scanSingleBool(t, db, `SELECT EXISTS(SELECT * FROM pg_database WHERE datname = $1)`, database) +} + +func scanSingleBool(t testing.TB, db *sql.DB, query string, args ...interface{}) bool { + var flag bool + row := db.QueryRow(query, args...) + require.NoError(t, row.Scan(&flag)) + return flag +} + +var ( + // Running `gdk env` takes about 250ms on my system and is thus comparatively slow. When + // running with Praefect as proxy, this time adds up and may thus slow down tests by quite a + // margin. We thus amortize these costs by only running it once. + databaseEnvOnce sync.Once + databaseEnv map[string]string +) + +func getDatabaseEnvironment(t testing.TB) map[string]string { + databaseEnvOnce.Do(func() { + envvars := map[string]string{} + + // We only process output if `gdk env` returned success. If it didn't, we simply assume that + // we are not running in a GDK environment and will try to extract variables from the + // environment instead. + if output, err := exec.Command("gdk", "env").Output(); err == nil { + for _, line := range strings.Split(string(output), "\n") { + const prefix = "export " + if !strings.HasPrefix(line, prefix) { + continue + } + + split := strings.SplitN(strings.TrimPrefix(line, prefix), "=", 2) + if len(split) != 2 { + continue + } + + envvars[split[0]] = split[1] + } + } + + for _, key := range []string{"PGHOST", "PGPORT", "PGUSER", "PGHOST_PGBOUNCER", "PGPORT_PGBOUNCER"} { + if _, ok := envvars[key]; !ok { + value, ok := os.LookupEnv(key) + if ok { + envvars[key] = value + } + } + } + + databaseEnv = envvars + }) + + return databaseEnv +} + +// WaitForBlockedQuery is a helper that waits until a blocked query matching the prefix is present in the +// database. This is useful for ensuring another transaction is blocking a query when testing concurrent +// execution of multiple queries. +func WaitForBlockedQuery(ctx context.Context, t testing.TB, db glsql.Querier, queryPrefix string) { + t.Helper() + + for { + var queryBlocked bool + require.NoError(t, db.QueryRowContext(ctx, ` + SELECT EXISTS ( + SELECT FROM pg_stat_activity + WHERE TRIM(e'\n' FROM query) LIKE $1 + AND state = 'active' + AND wait_event_type = 'Lock' + AND datname = current_database() + ) + `, queryPrefix+"%").Scan(&queryBlocked)) + + if queryBlocked { + return + } + + retry := time.NewTimer(time.Millisecond) + select { + case <-ctx.Done(): + retry.Stop() + return + case <-retry.C: + } + } } diff --git a/internal/praefect/datastore/glsql/testing_test.go b/internal/testhelper/testdb/db_test.go index 26a4f499c..a3687d977 100644 --- a/internal/praefect/datastore/glsql/testing_test.go +++ b/internal/testhelper/testdb/db_test.go @@ -1,4 +1,4 @@ -package glsql +package testdb import ( "testing" @@ -8,7 +8,7 @@ import ( func TestDB_Truncate(t *testing.T) { t.Parallel() - db := NewDB(t) + db := New(t) _, err := db.Exec("CREATE TABLE truncate_tbl(id BIGSERIAL PRIMARY KEY)") require.NoError(t, err) diff --git a/internal/testhelper/testdb/health.go b/internal/testhelper/testdb/health.go new file mode 100644 index 000000000..73c5d39f8 --- /dev/null +++ b/internal/testhelper/testdb/health.go @@ -0,0 +1,51 @@ +package testdb + +import ( + "context" + "testing" + + "github.com/lib/pq" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" +) + +// SetHealthyNodes sets the healthy nodes in the database as determined by the passed in map. The healthyNodes map is keyed by +// praefect name -> virtual storage -> storage. On each run, it clears all previous health checks from the table, so the +// passed in nodes are the only ones considered healthy after the function. As the healthy nodes are determined by the time of +// the last successful health check, this should be run in the same transastion as the tested query to prevent flakiness. +// +//nolint:revive +func SetHealthyNodes(t testing.TB, ctx context.Context, db glsql.Querier, healthyNodes map[string]map[string][]string) { + t.Helper() + + var praefects, virtualStorages, storages []string + for praefect, virtualStors := range healthyNodes { + for virtualStorage, stors := range virtualStors { + for _, storage := range stors { + praefects = append(praefects, praefect) + virtualStorages = append(virtualStorages, virtualStorage) + storages = append(storages, storage) + } + } + } + + _, err := db.ExecContext(ctx, ` +WITH clear_previous_checks AS ( DELETE FROM node_status ) + +INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) +SELECT + unnest($1::text[]) AS praefect_name, + unnest($2::text[]) AS shard_name, + unnest($3::text[]) AS node_name, + NOW() AS last_contact_attempt_at, + NOW() AS last_seen_active_at +ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET + last_contact_attempt_at = NOW(), + last_seen_active_at = NOW() + `, + pq.StringArray(praefects), + pq.StringArray(virtualStorages), + pq.StringArray(storages), + ) + require.NoError(t, err) +} diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index d01b80fc2..0e48d6622 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -31,10 +31,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" praefectconfig "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -85,7 +85,7 @@ func StartGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Serv // createDatabase create a new database with randomly generated name and returns it back to the caller. func createDatabase(t testing.TB) string { - db := glsql.NewDB(t) + db := testdb.New(t) return db.Name } @@ -113,7 +113,7 @@ func runPraefectProxy(t testing.TB, cfg config.Cfg, gitalyAddr, praefectBinPath Auth: auth.Config{ Token: cfg.Auth.Token, }, - DB: glsql.GetDBConfig(t, dbName), + DB: testdb.GetConfig(t, dbName), Failover: praefectconfig.Failover{ Enabled: true, ElectionStrategy: praefectconfig.ElectionStrategyLocal, |