Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect')
-rw-r--r--internal/praefect/auth_test.go3
-rw-r--r--internal/praefect/coordinator_pg_test.go10
-rw-r--r--internal/praefect/coordinator_test.go30
-rw-r--r--internal/praefect/datastore/assignment_test.go11
-rw-r--r--internal/praefect/datastore/collector_test.go6
-rw-r--r--internal/praefect/datastore/glsql/init_test.go7
-rw-r--r--internal/praefect/datastore/glsql/postgres_test.go3
-rw-r--r--internal/praefect/datastore/glsql/testing.go6
-rw-r--r--internal/praefect/datastore/glsql/testing_test.go3
-rw-r--r--internal/praefect/datastore/init_test.go9
-rw-r--r--internal/praefect/datastore/listener_postgres_test.go18
-rw-r--r--internal/praefect/datastore/postgres_test.go3
-rw-r--r--internal/praefect/datastore/queue_bm_test.go3
-rw-r--r--internal/praefect/datastore/queue_test.go46
-rw-r--r--internal/praefect/datastore/repository_store_bm_test.go3
-rw-r--r--internal/praefect/datastore/repository_store_test.go9
-rw-r--r--internal/praefect/helper_test.go3
-rw-r--r--internal/praefect/info_service_test.go1
-rw-r--r--internal/praefect/nodes/health_manager_test.go5
-rw-r--r--internal/praefect/nodes/init_test.go3
-rw-r--r--internal/praefect/nodes/per_repository_test.go5
-rw-r--r--internal/praefect/nodes/sql_elector_test.go27
-rw-r--r--internal/praefect/protoregistry/protoregistry_test.go1
-rw-r--r--internal/praefect/reconciler/reconciler_benchmark_test.go2
-rw-r--r--internal/praefect/reconciler/reconciler_test.go5
-rw-r--r--internal/praefect/replicator_pg_test.go8
-rw-r--r--internal/praefect/replicator_test.go19
-rw-r--r--internal/praefect/repository_exists_test.go5
-rw-r--r--internal/praefect/server_factory_test.go4
-rw-r--r--internal/praefect/server_test.go19
-rw-r--r--internal/praefect/transaction_test.go8
31 files changed, 187 insertions, 98 deletions
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 6394c8aa3..9ed2b146f 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -11,6 +11,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/auth"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
@@ -148,7 +149,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
},
}
logEntry := testhelper.DiscardTestEntry(t)
- queue := datastore.NewPostgresReplicationEventQueue(getDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
require.NoError(t, err)
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index c905d2ceb..e1764b07a 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -25,11 +25,8 @@ import (
"google.golang.org/protobuf/proto"
)
-func getDB(t testing.TB) glsql.DB {
- return glsql.GetDB(t)
-}
-
func TestStreamDirectorMutator_Transaction(t *testing.T) {
+ t.Parallel()
// For the test-with-praefect execution we disable a special case when repository
// records need to be created in the database.
defer testhelper.ModifyEnvironment(t, "GITALY_TEST_PRAEFECT_BIN", "")()
@@ -158,8 +155,12 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
},
}
+ db := glsql.NewDB(t)
+
for _, tc := range testcases {
t.Run(tc.desc, func(t *testing.T) {
+ db.TruncateAll(t)
+
storageNodes := make([]*config.Node, 0, len(tc.nodes))
for i := range tc.nodes {
socket := testhelper.GetTemporaryGitalySocketFileName(t)
@@ -178,7 +179,6 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
}
var replicationWaitGroup sync.WaitGroup
- db := getDB(t)
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
defer replicationWaitGroup.Done()
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index dabe0fd17..cd9557213 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -24,6 +24,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
@@ -59,6 +60,8 @@ func TestSecondaryRotation(t *testing.T) {
}
func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
+ t.Parallel()
+ db := glsql.NewDB(t)
for _, tc := range []struct {
desc string
readOnly bool
@@ -67,6 +70,8 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
{desc: "read-only", readOnly: true},
} {
t.Run(tc.desc, func(t *testing.T) {
+ db.TruncateAll(t)
+
const (
virtualStorage = "test-virtual-storage"
relativePath = "test-repository"
@@ -99,7 +104,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
}
coordinator := NewCoordinator(
- datastore.NewPostgresReplicationEventQueue(getDB(t)),
+ datastore.NewPostgresReplicationEventQueue(db),
rs,
NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) {
require.Equal(t, virtualStorage, vs)
@@ -132,6 +137,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
}
func TestStreamDirectorMutator(t *testing.T) {
+ t.Parallel()
gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
testhelper.NewServerWithHealth(t, gitalySocket0)
testhelper.NewServerWithHealth(t, gitalySocket1)
@@ -150,7 +156,7 @@ func TestStreamDirectorMutator(t *testing.T) {
var replEventWait sync.WaitGroup
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(getDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
defer replEventWait.Done()
return queue.Enqueue(ctx, event)
@@ -236,6 +242,7 @@ func TestStreamDirectorMutator(t *testing.T) {
}
func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
+ t.Parallel()
socket := testhelper.GetTemporaryGitalySocketFileName(t)
testhelper.NewServerWithHealth(t, socket)
@@ -281,7 +288,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
txMgr := transactions.NewManager(conf)
coordinator := NewCoordinator(
- datastore.NewPostgresReplicationEventQueue(getDB(t)),
+ datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)),
rs,
NewNodeManagerRouter(nodeMgr, rs),
txMgr,
@@ -355,6 +362,7 @@ func (m mockRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage,
}
func TestStreamDirectorAccessor(t *testing.T) {
+ t.Parallel()
gitalySocket := testhelper.GetTemporaryGitalySocketFileName(t)
testhelper.NewServerWithHealth(t, gitalySocket)
@@ -373,7 +381,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
},
}
- queue := datastore.NewPostgresReplicationEventQueue(getDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -452,6 +460,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
}
func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
+ t.Parallel()
gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
primaryHealthSrv := testhelper.NewServerWithHealth(t, gitalySocket0)
healthSrv := testhelper.NewServerWithHealth(t, gitalySocket1)
@@ -478,7 +487,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
},
}
- queue := datastore.NewPostgresReplicationEventQueue(getDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -731,10 +740,13 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
}
func TestStreamDirector_repo_creation(t *testing.T) {
+ t.Parallel()
// For the test-with-praefect execution we disable a special case when repository
// records need to be created in the database.
defer testhelper.ModifyEnvironment(t, "GITALY_TEST_PRAEFECT_BIN", "")()
+ db := glsql.NewDB(t)
+
for _, tc := range []struct {
desc string
electionStrategy config.ElectionStrategy
@@ -764,6 +776,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
+ db.TruncateAll(t)
primaryNode := &config.Node{Storage: "praefect-internal-1"}
healthySecondaryNode := &config.Node{Storage: "praefect-internal-2"}
unhealthySecondaryNode := &config.Node{Storage: "praefect-internal-3"}
@@ -779,7 +792,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
}
var replEventWait sync.WaitGroup
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(getDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
defer replEventWait.Done()
return queue.Enqueue(ctx, event)
@@ -981,6 +994,7 @@ func (m *mockPeeker) Modify(payload []byte) error {
}
func TestAbsentCorrelationID(t *testing.T) {
+ t.Parallel()
gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
healthSrv0 := testhelper.NewServerWithHealth(t, gitalySocket0)
healthSrv1 := testhelper.NewServerWithHealth(t, gitalySocket1)
@@ -1008,7 +1022,7 @@ func TestAbsentCorrelationID(t *testing.T) {
var replEventWait sync.WaitGroup
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(getDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
defer replEventWait.Done()
return queue.Enqueue(ctx, event)
@@ -1066,6 +1080,7 @@ func TestAbsentCorrelationID(t *testing.T) {
}
func TestCoordinatorEnqueueFailure(t *testing.T) {
+ t.Parallel()
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
&config.VirtualStorage{
@@ -1399,6 +1414,7 @@ func (c *mockDiskCache) StartLease(*gitalypb.Repository) (cache.LeaseEnder, erro
// fails. Most importantly, we want to make sure to only ever forward errors from the primary and
// never from the secondaries.
func TestCoordinator_grpcErrorHandling(t *testing.T) {
+ t.Parallel()
praefectConfig := config.Config{
VirtualStorages: []*config.VirtualStorage{
&config.VirtualStorage{
diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go
index 1859838cb..c8ac5ad84 100644
--- a/internal/praefect/datastore/assignment_test.go
+++ b/internal/praefect/datastore/assignment_test.go
@@ -4,16 +4,20 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
func TestAssignmentStore_GetHostAssignments(t *testing.T) {
+ t.Parallel()
type assignment struct {
virtualStorage string
relativePath string
storage string
}
+ db := glsql.NewDB(t)
+
configuredStorages := []string{"storage-1", "storage-2", "storage-3"}
for _, tc := range []struct {
desc string
@@ -71,7 +75,7 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := getDB(t)
+ db.TruncateAll(t)
for _, assignment := range tc.existingAssignments {
_, err := db.ExecContext(ctx, `
@@ -98,6 +102,7 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) {
}
func TestAssignmentStore_SetReplicationFactor(t *testing.T) {
+ t.Parallel()
type matcher func(testing.TB, []string)
equal := func(expected []string) matcher {
@@ -114,6 +119,8 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) {
}
}
+ db := glsql.NewDB(t)
+
for _, tc := range []struct {
desc string
existingAssignments []string
@@ -191,7 +198,7 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := getDB(t)
+ db.TruncateAll(t)
configuredStorages := map[string][]string{"virtual-storage": {"primary", "secondary-1", "secondary-2"}}
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go
index 842f18cb3..3514b1239 100644
--- a/internal/praefect/datastore/collector_test.go
+++ b/internal/praefect/datastore/collector_test.go
@@ -11,10 +11,12 @@ import (
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
func TestRepositoryStoreCollector(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -29,6 +31,8 @@ func TestRepositoryStoreCollector(t *testing.T) {
replicas replicas
}
+ db := glsql.NewDB(t)
+
for _, tc := range []struct {
desc string
healthyNodes []string
@@ -144,7 +148,7 @@ func TestRepositoryStoreCollector(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- tx := getDB(t).Begin(t)
+ tx := db.Begin(t)
defer tx.Rollback(t)
testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{
diff --git a/internal/praefect/datastore/glsql/init_test.go b/internal/praefect/datastore/glsql/init_test.go
deleted file mode 100644
index 04fd46eb9..000000000
--- a/internal/praefect/datastore/glsql/init_test.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package glsql
-
-import (
- "testing"
-)
-
-func getDB(t testing.TB) DB { return GetDB(t) }
diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go
index 389ec959a..5616274bd 100644
--- a/internal/praefect/datastore/glsql/postgres_test.go
+++ b/internal/praefect/datastore/glsql/postgres_test.go
@@ -61,7 +61,8 @@ func TestUint64Provider(t *testing.T) {
}
func TestScanAll(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := NewDB(t)
var ids Uint64Provider
notEmptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id)")
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index d37f05056..d1cc2806e 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -113,7 +113,7 @@ func (db DB) Close() error {
return nil
}
-// GetDB returns a wrapper around the database connection pool.
+// NewDB returns a wrapper around the database connection pool.
// Must be used only for testing.
// The new database with empty relations will be created for each call of this function.
// It uses env vars:
@@ -121,14 +121,14 @@ func (db DB) Close() error {
// PGPORT - required, binding port
// PGUSER - optional, user - `$ whoami` would be used if not provided
// Once the test is completed the database will be dropped on test cleanup execution.
-func GetDB(t testing.TB) DB {
+func NewDB(t testing.TB) DB {
t.Helper()
database := "praefect_" + strings.ReplaceAll(uuid.New().String(), "-", "")
return DB{DB: initPraefectTestDB(t, database), Name: database}
}
// GetDBConfig returns the database configuration determined by
-// environment variables. See GetDB() for the list of variables.
+// environment variables. See NewDB() for the list of variables.
func GetDBConfig(t testing.TB, database string) config.DB {
getEnvFromGDK(t)
diff --git a/internal/praefect/datastore/glsql/testing_test.go b/internal/praefect/datastore/glsql/testing_test.go
index e3e0e3cf2..26a4f499c 100644
--- a/internal/praefect/datastore/glsql/testing_test.go
+++ b/internal/praefect/datastore/glsql/testing_test.go
@@ -7,7 +7,8 @@ import (
)
func TestDB_Truncate(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := NewDB(t)
_, err := db.Exec("CREATE TABLE truncate_tbl(id BIGSERIAL PRIMARY KEY)")
require.NoError(t, err)
diff --git a/internal/praefect/datastore/init_test.go b/internal/praefect/datastore/init_test.go
deleted file mode 100644
index 3a595a92e..000000000
--- a/internal/praefect/datastore/init_test.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package datastore
-
-import (
- "testing"
-
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
-)
-
-func getDB(t testing.TB) glsql.DB { return glsql.GetDB(t) }
diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go
index d46cfe563..53f1cc403 100644
--- a/internal/praefect/datastore/listener_postgres_test.go
+++ b/internal/praefect/datastore/listener_postgres_test.go
@@ -84,7 +84,8 @@ func (mlh mockListenHandler) Connected() {
}
func TestPostgresListener_Listen(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
logger := testhelper.NewTestLogger(t)
@@ -365,7 +366,8 @@ func requireEqualNotificationEntries(t *testing.T, d string, entries []notificat
}
func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
const channel = "repositories_updates"
@@ -397,7 +399,8 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
}
func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
const channel = "storage_repositories_updates"
@@ -422,7 +425,8 @@ func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) {
}
func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
const channel = "storage_repositories_updates"
@@ -446,7 +450,8 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) {
}
func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
const channel = "storage_repositories_updates"
@@ -464,7 +469,8 @@ func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) {
}
func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
const channel = "storage_repositories_updates"
diff --git a/internal/praefect/datastore/postgres_test.go b/internal/praefect/datastore/postgres_test.go
index 2c8baaf26..9ae3a78be 100644
--- a/internal/praefect/datastore/postgres_test.go
+++ b/internal/praefect/datastore/postgres_test.go
@@ -9,7 +9,8 @@ import (
)
func TestMigrateStatus(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
config := config.Config{
DB: glsql.GetDBConfig(t, db.Name),
diff --git a/internal/praefect/datastore/queue_bm_test.go b/internal/praefect/datastore/queue_bm_test.go
index 748f663ca..8fea25258 100644
--- a/internal/praefect/datastore/queue_bm_test.go
+++ b/internal/praefect/datastore/queue_bm_test.go
@@ -4,6 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
@@ -30,7 +31,7 @@ func BenchmarkPostgresReplicationEventQueue_Acknowledge(b *testing.B) {
}
func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[JobState]int) {
- db := getDB(b)
+ db := glsql.NewDB(b)
ctx, cancel := testhelper.Context()
defer cancel()
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 720cc0d79..6ed781286 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -13,6 +13,8 @@ import (
)
func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) {
+ t.Parallel()
+ db := glsql.NewDB(t)
for _, tc := range []struct {
desc string
existingJob *ReplicationEvent
@@ -117,7 +119,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -151,7 +153,8 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) {
}
func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -197,7 +200,8 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
}
func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.T) {
- queue := NewPostgresReplicationEventQueue(getDB(t))
+ t.Parallel()
+ queue := NewPostgresReplicationEventQueue(glsql.NewDB(t))
ctx, cancel := testhelper.Context()
defer cancel()
@@ -247,7 +251,8 @@ func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.
}
func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -380,7 +385,8 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
}
func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -428,7 +434,8 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
// expected results are listed as literals on purpose to be more explicit about what is going on with data
func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -537,7 +544,8 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
}
func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -599,7 +607,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
}
func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -642,7 +651,8 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
}
func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -813,6 +823,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
}
func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
+ t.Parallel()
eventType1 := ReplicationEvent{Job: ReplicationJob{
Change: UpdateRepo,
VirtualStorage: "vs-1",
@@ -830,6 +841,8 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
eventType4 := eventType1
eventType4.Job.TargetNodeStorage = "s-2"
+ db := glsql.NewDB(t)
+
t.Run("no events is valid", func(t *testing.T) {
// 'qc' is not initialized, so the test will fail if there will be an attempt to make SQL operation
queue := NewPostgresReplicationEventQueue(nil)
@@ -851,7 +864,7 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
})
t.Run("stops after first error", func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -870,7 +883,7 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
})
t.Run("stops if nothing to update (extended coverage)", func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -894,7 +907,7 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
})
t.Run("triggers all passed in events", func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
var wg sync.WaitGroup
ctx, cancel := testhelper.Context()
@@ -951,6 +964,7 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
}
func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -974,8 +988,10 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
eventType4 := eventType3
eventType4.Job.TargetNodeStorage = "gitaly-3"
+ db := glsql.NewDB(t)
+
t.Run("no stale jobs yet", func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
source := NewPostgresReplicationEventQueue(db)
event, err := source.Enqueue(ctx, eventType1)
@@ -990,7 +1006,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
})
t.Run("jobs considered stale only at 'in_progress' state", func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
source := NewPostgresReplicationEventQueue(db)
// move event to 'ready' state
@@ -1029,7 +1045,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
})
t.Run("stale jobs updated for all virtual storages and storages at once", func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
source := NewPostgresReplicationEventQueue(db)
var exp []ReplicationEvent
diff --git a/internal/praefect/datastore/repository_store_bm_test.go b/internal/praefect/datastore/repository_store_bm_test.go
index 548b3fdee..f39750114 100644
--- a/internal/praefect/datastore/repository_store_bm_test.go
+++ b/internal/praefect/datastore/repository_store_bm_test.go
@@ -5,6 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
@@ -37,7 +38,7 @@ func BenchmarkPostgresRepositoryStore_GetConsistentStorages(b *testing.B) {
}
func benchmarkGetConsistentStorages(b *testing.B, nstorages, nrepositories int) {
- db := getDB(b)
+ db := glsql.NewDB(b)
ctx, cancel := testhelper.Context()
defer cancel()
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index 1df2786e9..22db3177b 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -8,6 +8,7 @@ import (
"github.com/lib/pq"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
@@ -29,8 +30,10 @@ type requireState func(t *testing.T, ctx context.Context, vss virtualStorageStat
type repositoryStoreFactory func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState)
func TestRepositoryStore_Postgres(t *testing.T) {
+ t.Parallel()
+ db := glsql.NewDB(t)
testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState) {
- db := getDB(t)
+ db.TruncateAll(t)
gs := NewPostgresRepositoryStore(db, storages)
requireVirtualStorageState := func(t *testing.T, ctx context.Context, exp virtualStorageState) {
@@ -840,6 +843,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
}
func TestPostgresRepositoryStore_GetPartiallyAvailableRepositories(t *testing.T) {
+ t.Parallel()
+ db := glsql.NewDB(t)
for _, tc := range []struct {
desc string
nonExistentRepository bool
@@ -1018,7 +1023,7 @@ func TestPostgresRepositoryStore_GetPartiallyAvailableRepositories(t *testing.T)
ctx, cancel := testhelper.Context()
defer cancel()
- tx := getDB(t).Begin(t)
+ tx := db.Begin(t)
defer tx.Rollback(t)
configuredStorages := map[string][]string{"virtual-storage": {"primary", "secondary-1"}}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 7d922cb35..bcf587505 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -15,6 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/log"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
@@ -115,7 +116,7 @@ func withMockBackends(t testing.TB, backends map[string]mock.SimpleServiceServer
}
func defaultQueue(t testing.TB) datastore.ReplicationEventQueue {
- return datastore.NewPostgresReplicationEventQueue(getDB(t))
+ return datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
}
func defaultTxMgr(conf config.Config) *transactions.Manager {
diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go
index 4dc8f755e..e289040ad 100644
--- a/internal/praefect/info_service_test.go
+++ b/internal/praefect/info_service_test.go
@@ -22,6 +22,7 @@ import (
)
func TestInfoService_RepositoryReplicas(t *testing.T) {
+ t.Parallel()
var cfgs []gconfig.Cfg
var cfgNodes []*config.Node
var testRepo *gitalypb.Repository
diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go
index 4e4a4bf0e..07b359090 100644
--- a/internal/praefect/nodes/health_manager_test.go
+++ b/internal/praefect/nodes/health_manager_test.go
@@ -24,6 +24,7 @@ func (m mockHealthClient) Check(ctx context.Context, r *grpc_health_v1.HealthChe
}
func TestHealthManager(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -37,6 +38,8 @@ func TestHealthManager(t *testing.T) {
HealthConsensus map[string][]string
}
+ db := glsql.NewDB(t)
+
for _, tc := range []struct {
desc string
healthChecks HealthChecks
@@ -470,7 +473,7 @@ func TestHealthManager(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
healthStatus := map[string]grpc_health_v1.HealthCheckResponse_ServingStatus{}
// healthManagers are cached in order to keep the internal state intact between different
diff --git a/internal/praefect/nodes/init_test.go b/internal/praefect/nodes/init_test.go
index 7ec527cb8..678b275ea 100644
--- a/internal/praefect/nodes/init_test.go
+++ b/internal/praefect/nodes/init_test.go
@@ -4,7 +4,6 @@ import (
"os"
"testing"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
@@ -18,5 +17,3 @@ func testMain(m *testing.M) (code int) {
defer cleanup()
return m.Run()
}
-
-func getDB(t testing.TB) glsql.DB { return glsql.GetDB(t) }
diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go
index 0ed62e266..b37071251 100644
--- a/internal/praefect/nodes/per_repository_test.go
+++ b/internal/praefect/nodes/per_repository_test.go
@@ -14,6 +14,7 @@ import (
)
func TestPerRepositoryElector(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -45,6 +46,8 @@ func TestPerRepositoryElector(t *testing.T) {
primary matcher
}
+ db := glsql.NewDB(t)
+
for _, tc := range []struct {
desc string
state state
@@ -482,7 +485,7 @@ func TestPerRepositoryElector(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
rs := datastore.NewPostgresRepositoryStore(db, nil)
for virtualStorage, relativePaths := range tc.state {
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index fb649f934..09d0a89c2 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -28,7 +28,8 @@ import (
var shardName string = "test-shard-0"
func TestGetPrimaryAndSecondaries(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
logger := testhelper.NewTestLogger(t).WithField("test", t.Name())
praefectSocket := testhelper.GetTemporaryGitalySocketFileName(t)
@@ -71,7 +72,8 @@ func TestGetPrimaryAndSecondaries(t *testing.T) {
}
func TestSqlElector_slow_execution(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
praefectSocket := "unix://" + testhelper.GetTemporaryGitalySocketFileName(t)
logger := testhelper.NewTestLogger(t).WithField("test", t.Name())
@@ -108,7 +110,8 @@ func TestSqlElector_slow_execution(t *testing.T) {
}
func TestBasicFailover(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
logger := testhelper.NewTestLogger(t).WithField("test", t.Name())
praefectSocket := testhelper.GetTemporaryGitalySocketFileName(t)
@@ -216,16 +219,15 @@ func TestBasicFailover(t *testing.T) {
}
func TestElectDemotedPrimary(t *testing.T) {
- db := getDB(t)
-
- tx := getDB(t).Begin(t)
+ t.Parallel()
+ tx := glsql.NewDB(t).Begin(t)
defer tx.Rollback(t)
node := config.Node{Storage: "gitaly-0"}
elector := newSQLElector(
shardName,
config.Config{},
- db.DB,
+ nil,
testhelper.DiscardTestLogger(t),
[]*nodeStatus{{node: node}},
)
@@ -283,7 +285,8 @@ func predateElection(t testing.TB, ctx context.Context, db glsql.Querier, shardN
}
func TestElectNewPrimary(t *testing.T) {
- db := getDB(t)
+ t.Parallel()
+ db := glsql.NewDB(t)
ns := []*nodeStatus{{
node: config.Node{
@@ -395,9 +398,7 @@ func TestElectNewPrimary(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.desc, func(t *testing.T) {
- db.TruncateAll(t)
-
- tx := getDB(t).Begin(t)
+ tx := db.Begin(t)
defer tx.Rollback(t)
_, err := tx.Exec(testCase.initialReplQueueInsert)
@@ -423,6 +424,7 @@ func TestElectNewPrimary(t *testing.T) {
}
func TestConnectionMultiplexing(t *testing.T) {
+ t.Parallel()
errNonMuxed := status.Error(codes.Internal, "non-muxed connection")
errMuxed := status.Error(codes.Internal, "muxed connection")
@@ -453,6 +455,7 @@ func TestConnectionMultiplexing(t *testing.T) {
go srv.Serve(ln)
+ db := glsql.NewDB(t)
mgr, err := NewManager(
testhelper.DiscardTestEntry(t),
config.Config{
@@ -470,7 +473,7 @@ func TestConnectionMultiplexing(t *testing.T) {
},
},
},
- getDB(t).DB,
+ db.DB,
nil,
promtest.NewMockHistogramVec(),
protoregistry.GitalyProtoPreregistered,
diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go
index d972f1898..36c90db92 100644
--- a/internal/praefect/protoregistry/protoregistry_test.go
+++ b/internal/praefect/protoregistry/protoregistry_test.go
@@ -85,7 +85,6 @@ func TestNewProtoRegistry(t *testing.T) {
"FindDefaultBranchName": protoregistry.OpAccessor,
"FindAllBranchNames": protoregistry.OpAccessor,
"FindAllTagNames": protoregistry.OpAccessor,
- "FindRefName": protoregistry.OpAccessor,
"FindLocalBranches": protoregistry.OpAccessor,
"FindAllBranches": protoregistry.OpAccessor,
"FindAllTags": protoregistry.OpAccessor,
diff --git a/internal/praefect/reconciler/reconciler_benchmark_test.go b/internal/praefect/reconciler/reconciler_benchmark_test.go
index fd9b946cf..bdb6f13ac 100644
--- a/internal/praefect/reconciler/reconciler_benchmark_test.go
+++ b/internal/praefect/reconciler/reconciler_benchmark_test.go
@@ -31,7 +31,7 @@ func benchmarkReconcile(b *testing.B, numRepositories int, worstCase bool) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.GetDB(b)
+ db := glsql.NewDB(b)
behind := 0
if worstCase {
diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go
index 465f4fee5..2fff58c63 100644
--- a/internal/praefect/reconciler/reconciler_test.go
+++ b/internal/praefect/reconciler/reconciler_test.go
@@ -18,6 +18,7 @@ import (
)
func TestReconciler(t *testing.T) {
+ t.Parallel()
// repositories describes storage state as
// virtual storage -> relative path -> physical storage -> generation
@@ -77,6 +78,8 @@ func TestReconciler(t *testing.T) {
return out
}
+ db := glsql.NewDB(t)
+
for _, tc := range []struct {
desc string
healthyStorages storages
@@ -1060,7 +1063,7 @@ func TestReconciler(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- db := glsql.GetDB(t)
+ db.TruncateAll(t)
// set up the repository generation records expected by the test case
rs := datastore.NewPostgresRepositoryStore(db, configuredStorages)
diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go
index 7cd0a9f53..63e21b223 100644
--- a/internal/praefect/replicator_pg_test.go
+++ b/internal/praefect/replicator_pg_test.go
@@ -11,6 +11,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
@@ -26,6 +27,7 @@ func (m *mockRepositoryService) ReplicateRepository(ctx context.Context, r *gita
}
func TestReplicatorInvalidSourceRepository(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -47,7 +49,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) {
targetCC, err := client.Dial(ln.Addr().Network()+":"+ln.Addr().String(), nil)
require.NoError(t, err)
- rs := datastore.NewPostgresRepositoryStore(getDB(t), nil)
+ rs := datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil)
require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "gitaly-1", 0))
r := &defaultReplicator{rs: rs, log: testhelper.DiscardTestLogger(t)}
@@ -66,6 +68,8 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) {
}
func TestReplicatorDestroy(t *testing.T) {
+ t.Parallel()
+ db := glsql.NewDB(t)
for _, tc := range []struct {
change datastore.ChangeType
exists bool
@@ -76,7 +80,7 @@ func TestReplicatorDestroy(t *testing.T) {
{change: "invalid-type", exists: true, error: errors.New(`unknown change type: "invalid-type"`)},
} {
t.Run(string(tc.change), func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
rs := datastore.NewPostgresRepositoryStore(db, nil)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 32b49afb7..0ad2dcd7f 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -26,6 +26,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
@@ -56,6 +57,7 @@ func testMain(m *testing.M) int {
}
func TestReplMgr_ProcessBacklog(t *testing.T) {
+ t.Parallel()
primaryCfg, testRepo, testRepoPath := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary"))
primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
testhelper.BuildGitalySSH(t, primaryCfg)
@@ -145,7 +147,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
logger := testhelper.DiscardTestLogger(t)
loggerHook := test.NewLocal(logger)
- queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(getDB(t)))
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
cancel() // when it is called we know that replication is finished
return queue.Acknowledge(ctx, state, ids)
@@ -263,6 +265,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
}
func TestReplicator_PropagateReplicationJob(t *testing.T) {
+ t.Parallel()
primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1"
primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage))
@@ -296,7 +299,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
// unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty
// By using WaitGroup we are sure the test cleanup will be started after all replication
// requests are completed, so no running cache IO operations happen.
- queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(getDB(t)))
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
var wg sync.WaitGroup
queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
wg.Add(1)
@@ -631,6 +634,7 @@ func getChecksumFunc(ctx context.Context, client gitalypb.RepositoryServiceClien
}
func TestProcessBacklog_FailedJobs(t *testing.T) {
+ t.Parallel()
primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("default"))
primaryAddr := testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
@@ -664,7 +668,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(getDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
processed := make(chan struct{})
dequeues := 0
@@ -752,6 +756,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
}
func TestProcessBacklog_Success(t *testing.T) {
+ t.Parallel()
primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary"))
primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
testhelper.BuildGitalySSH(t, primaryCfg)
@@ -787,7 +792,7 @@ func TestProcessBacklog_Success(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(getDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
processed := make(chan struct{})
queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
@@ -892,6 +897,7 @@ func TestProcessBacklog_Success(t *testing.T) {
}
func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
+ t.Parallel()
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
{
@@ -908,7 +914,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
ctx, cancel := testhelper.Context()
first := true
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(getDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) {
select {
case <-ctx.Done():
@@ -969,6 +975,7 @@ func (m mockReplicator) Replicate(ctx context.Context, event datastore.Replicati
}
func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -991,7 +998,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
},
}
- queue := datastore.NewPostgresReplicationEventQueue(getDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
_, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
Change: datastore.UpdateRepo,
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
index 5d6b294d9..75db7e81b 100644
--- a/internal/praefect/repository_exists_test.go
+++ b/internal/praefect/repository_exists_test.go
@@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
@@ -20,8 +21,10 @@ import (
)
func TestRepositoryExistsStreamInterceptor(t *testing.T) {
+ t.Parallel()
errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly")
+ db := glsql.NewDB(t)
for _, tc := range []struct {
desc string
routeToGitaly bool
@@ -65,7 +68,7 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- db := getDB(t)
+ db.TruncateAll(t)
rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}})
ctx, cancel := testhelper.Context()
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 4a38e784a..53bbc825b 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -18,6 +18,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
@@ -32,6 +33,7 @@ import (
)
func TestServerFactory(t *testing.T) {
+ t.Parallel()
cfg, repo, repoPath := testcfg.BuildWithRepo(t)
gitalyAddr := testserver.RunGitalyServer(t, cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
@@ -61,7 +63,7 @@ func TestServerFactory(t *testing.T) {
revision := text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "HEAD"))
logger := testhelper.DiscardTestEntry(t)
- queue := datastore.NewPostgresReplicationEventQueue(getDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
rs := datastore.MockRepositoryStore{}
nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil, nil)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 717f11e90..47647be11 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -27,6 +27,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
@@ -112,6 +113,7 @@ func TestNewBackchannelServerFactory(t *testing.T) {
}
func TestGitalyServerInfo(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -205,6 +207,7 @@ func TestGitalyServerInfo(t *testing.T) {
}
func TestGitalyServerInfoBadNode(t *testing.T) {
+ t.Parallel()
gitalySocket := testhelper.GetTemporaryGitalySocketFileName(t)
healthSrv := testhelper.NewServerWithHealth(t, gitalySocket)
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
@@ -243,6 +246,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
}
func TestDiskStatistics(t *testing.T) {
+ t.Parallel()
praefectCfg := config.Config{VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}}}
for _, name := range []string{"gitaly-1", "gitaly-2"} {
gitalyCfg := testcfg.Build(t)
@@ -280,6 +284,7 @@ func TestDiskStatistics(t *testing.T) {
}
func TestHealthCheck(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -301,6 +306,7 @@ func TestHealthCheck(t *testing.T) {
}
func TestRejectBadStorage(t *testing.T) {
+ t.Parallel()
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
{
@@ -333,6 +339,7 @@ func TestRejectBadStorage(t *testing.T) {
}
func TestWarnDuplicateAddrs(t *testing.T) {
+ t.Parallel()
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
&config.VirtualStorage{
@@ -462,6 +469,7 @@ func TestWarnDuplicateAddrs(t *testing.T) {
}
func TestRemoveRepository(t *testing.T) {
+ t.Parallel()
gitalyCfgs := make([]gconfig.Cfg, 3)
repos := make([][]*gitalypb.Repository, 3)
praefectCfg := config.Config{VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}}}
@@ -494,7 +502,7 @@ func TestRemoveRepository(t *testing.T) {
// TODO: once https://gitlab.com/gitlab-org/gitaly/-/issues/2703 is done and the replication manager supports
// graceful shutdown, we can remove this code that waits for jobs to be complete
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(getDB(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
jobsDoneCh := make(chan struct{}, 2)
queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
defer func() {
@@ -561,6 +569,7 @@ func pollUntilRemoved(t testing.TB, path string, deadline <-chan time.Time) {
}
func TestRenameRepository(t *testing.T) {
+ t.Parallel()
gitalyStorages := []string{"gitaly-1", "gitaly-2", "gitaly-3"}
repoPaths := make([]string, len(gitalyStorages))
praefectCfg := config.Config{
@@ -592,7 +601,7 @@ func TestRenameRepository(t *testing.T) {
var canCheckRepo sync.WaitGroup
canCheckRepo.Add(2)
- evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(getDB(t)))
+ evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
evq.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
defer canCheckRepo.Done()
return queue.Acknowledge(ctx, state, ids)
@@ -737,6 +746,7 @@ func newSmartHTTPGrpcServer(t *testing.T, cfg gconfig.Cfg, smartHTTPService gita
}
func TestProxyWrites(t *testing.T) {
+ t.Parallel()
txMgr := transactions.NewManager(config.Config{})
smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}
@@ -772,7 +782,7 @@ func TestProxyWrites(t *testing.T) {
},
}
- queue := datastore.NewPostgresReplicationEventQueue(getDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
entry := testhelper.DiscardTestEntry(t)
nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
@@ -856,6 +866,7 @@ func TestProxyWrites(t *testing.T) {
}
func TestErrorThreshold(t *testing.T) {
+ t.Parallel()
backendToken := ""
backend, cleanup := newMockDownstream(t, backendToken, &mockSvc{
repoMutatorUnary: func(ctx context.Context, req *mock.RepoRequest) (*emptypb.Empty, error) {
@@ -906,7 +917,7 @@ func TestErrorThreshold(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- queue := datastore.NewPostgresReplicationEventQueue(getDB(t))
+ queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
entry := testhelper.DiscardTestEntry(t)
testCases := []struct {
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 80da5b777..7f79c1430 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -70,6 +70,7 @@ func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected
}
func TestTransactionSucceeds(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -110,6 +111,7 @@ func TestTransactionSucceeds(t *testing.T) {
}
func TestTransactionWithMultipleNodes(t *testing.T) {
+ t.Parallel()
testcases := []struct {
desc string
nodes []string
@@ -220,6 +222,7 @@ func TestTransactionWithMultipleNodes(t *testing.T) {
}
func TestTransactionWithContextCancellation(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -330,6 +333,7 @@ func TestTransactionRegistrationWithInvalidThresholdFails(t *testing.T) {
}
func TestTransactionReachesQuorum(t *testing.T) {
+ t.Parallel()
tc := []struct {
desc string
voters []voter
@@ -459,6 +463,7 @@ func TestTransactionReachesQuorum(t *testing.T) {
}
func TestTransactionWithMultipleVotes(t *testing.T) {
+ t.Parallel()
type multiVoter struct {
voteCount uint
votes []string
@@ -567,6 +572,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) {
}
func TestTransactionFailures(t *testing.T) {
+ t.Parallel()
ctx, cancel := testhelper.Context()
defer cancel()
@@ -595,6 +601,7 @@ func TestTransactionFailures(t *testing.T) {
}
func TestTransactionCancellation(t *testing.T) {
+ t.Parallel()
testcases := []struct {
desc string
voters []voter
@@ -713,6 +720,7 @@ func TestTransactionCancellation(t *testing.T) {
}
func TestStopTransaction(t *testing.T) {
+ t.Parallel()
hash := sha1.Sum([]byte("foo"))
t.Run("stopping nonexisting transaction fails", func(t *testing.T) {