Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2022-02-18 13:41:33 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2022-02-23 14:00:21 +0300
commitbdd73865268ec2d237bf4cb6694c97e91bab8761 (patch)
tree4880d06c42cc8db3d9dbf88b7da46971d1f7bd0f
parentc61dedb0af342fb1f21a2fa994aad9fb9873587c (diff)
praefect: Repository should exist before scheduling eventsps-clean-queue-on-repo-removal
After introduction of the repository ID we use it for various operations where the relative path and virtual storage were used. One of the use cases is creation of the replication events. The injected repository ID is used to identify concrete repository later. Because now we have a foreign key we are not allowed to create replication events for repositories that are not exist yet in the database. That is why we use PostgresRepositoryStore to create a repository records before we enqueue any replication events. The change also contains fixes of some inconsistencies in setup logic as double-database creation, unneeded transactions usage.
-rw-r--r--internal/praefect/coordinator_pg_test.go17
-rw-r--r--internal/praefect/coordinator_test.go25
-rw-r--r--internal/praefect/replicator_test.go73
-rw-r--r--internal/praefect/server_test.go29
4 files changed, 84 insertions, 60 deletions
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index e35da6b5d..c13fe0570 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -191,18 +191,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
txMgr := transactions.NewManager(conf)
- tx := db.Begin(t)
- defer tx.Rollback(t)
-
// set up the generations prior to transaction
- rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames())
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
repoCreated := false
for i, n := range tc.nodes {
- if n.generation == datastore.GenerationUnknown {
- continue
- }
-
if !repoCreated {
repoCreated = true
require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false))
@@ -211,7 +204,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, repo.RelativePath, n.generation))
}
- testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()})
+ testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": conf.StorageNames()})
nodeSet, err := DialNodes(
ctx,
@@ -229,11 +222,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
rs,
NewPerRepositoryRouter(
nodeSet.Connections(),
- nodes.NewPerRepositoryElector(tx),
+ nodes.NewPerRepositoryElector(db),
StaticHealthChecker(conf.StorageNames()),
NewLockedRandom(rand.New(rand.NewSource(0))),
rs,
- datastore.NewAssignmentStore(tx, conf.StorageNames()),
+ datastore.NewAssignmentStore(db, conf.StorageNames()),
rs,
nil,
),
@@ -315,8 +308,6 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
require.Equal(t, n.expectedGeneration, gen, "node %d has wrong generation", i)
}
- tx.Commit(t)
-
replicationWaitGroup.Wait()
for i, node := range tc.nodes {
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 4749b3acb..0bb224d3d 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -189,7 +189,7 @@ func TestStreamDirectorMutator(t *testing.T) {
}
testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()})
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created")
return queue.Enqueue(ctx, event)
@@ -322,8 +322,13 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
txMgr := transactions.NewManager(conf)
+ db := testdb.New(t)
+ require.NoError(t, datastore.NewPostgresRepositoryStore(db, conf.StorageNames()).
+ CreateRepository(ctx, 1, repo.GetStorageName(), repo.GetRelativePath(), repo.GetRelativePath(), "primary", []string{"secondary"}, nil, true, false),
+ )
+
coordinator := NewCoordinator(
- datastore.NewPostgresReplicationEventQueue(testdb.New(t)),
+ datastore.NewPostgresReplicationEventQueue(db),
rs,
NewNodeManagerRouter(nodeMgr, rs),
txMgr,
@@ -869,7 +874,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
assert.Equal(t, []string{unhealthySecondaryNode.Storage}, outdatedSecondaries)
assert.Equal(t, tc.primaryStored, storePrimary)
assert.Equal(t, tc.assignmentsStored, storeAssignments)
- return nil
+ return datastore.NewPostgresRepositoryStore(db, conf.StorageNames()).CreateRepository(ctx, repoID, virtualStorage, relativePath, replicaPath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments)
},
}
@@ -1078,7 +1083,8 @@ func TestAbsentCorrelationID(t *testing.T) {
},
}
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ db := testdb.New(t)
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created")
return queue.Enqueue(ctx, event)
@@ -1097,7 +1103,7 @@ func TestAbsentCorrelationID(t *testing.T) {
defer nodeMgr.Stop()
txMgr := transactions.NewManager(conf)
- rs := datastore.MockRepositoryStore{}
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
coordinator := NewCoordinator(
queueInterceptor,
@@ -1553,7 +1559,14 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
})
}
+ db := testdb.New(t)
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+ require.NoError(t, datastore.NewPostgresRepositoryStore(db, praefectConfig.StorageNames()).
+ CreateRepository(ctx, 1, testhelper.DefaultStorageName, repoProto.GetRelativePath(), repoProto.GetRelativePath(), repoProto.GetStorageName(), nil, nil, true, false),
+ )
+
praefectConn, _, cleanup := runPraefectServer(t, ctx, praefectConfig, buildOptions{
+ withQueue: queue,
// Set up a mock manager which sets up primary/secondaries and pretends that all nodes are
// healthy. We need fixed roles and unhealthy nodes will not take part in transactions.
withNodeMgr: &nodes.MockManager{
@@ -1569,7 +1582,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
}, nil
},
},
- // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent
+ // Set up a mock repository store pretending that all nodes are consistent. Only consistent
// nodes will take part in transactions.
withRepoStore: datastore.MockRepositoryStore{
GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) {
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 7cd48ff3a..4018b51bd 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -56,6 +56,8 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
}
func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
+ db := testdb.New(t)
+
primaryCfg, testRepoProto, testRepoPath := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary"))
testRepo := localrepo.NewTestRepo(t, primaryCfg, testRepoProto)
primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
@@ -155,7 +157,10 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
logger := testhelper.NewDiscardingLogger(t)
loggerHook := test.NewLocal(logger)
- queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false))
+
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
cancel() // when it is called we know that replication is finished
return queue.Acknowledge(ctx, state, ids)
@@ -165,10 +170,6 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
_, err = queue.Enqueue(ctx, events[0])
require.NoError(t, err)
- db := testdb.New(t)
- rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false))
-
replMgr := NewReplMgr(
loggerEntry,
conf.StorageNames(),
@@ -282,6 +283,8 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
func TestReplicator_PropagateReplicationJob(t *testing.T) {
t.Parallel()
+ db := testdb.New(t)
+
primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1"
primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage))
@@ -315,7 +318,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
// unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty
// By using WaitGroup we are sure the test cleanup will be started after all replication
// requests are completed, so no running cache IO operations happen.
- queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
var wg sync.WaitGroup
queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
wg.Add(1)
@@ -375,6 +378,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
StorageName: conf.VirtualStorages[0].Name,
RelativePath: repositoryRelativePath,
}
+ // We need to create repository record in the database before we can schedule replication events
+ repoStore := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, repoStore.CreateRepository(ctx, 1, conf.VirtualStorages[0].Name, repository.GetRelativePath(), repository.GetRelativePath(), repository.GetStorageName(), nil, nil, true, false))
//nolint:staticcheck
_, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{
@@ -685,6 +691,10 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
}
func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) {
+ ctx, cancel := context.WithCancel(ctx)
+ t.Cleanup(cancel)
+ db := testdb.New(t)
+
primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("default"))
primaryAddr := testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
@@ -714,26 +724,30 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) {
},
},
}
- ctx, cancel := context.WithCancel(ctx)
-
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
// this job exists to verify that replication works
okJob := datastore.ReplicationJob{
- RepositoryID: 1,
- Change: datastore.UpdateRepo,
+ RepositoryID: 1,
+ // The optimize_repository is used because we can't use update type.
+ // All update events won't be picked as they will be marked as done once the first is over.
+ Change: datastore.OptimizeRepository,
RelativePath: testRepo.RelativePath,
TargetNodeStorage: secondary.Storage,
- SourceNodeStorage: primary.Storage,
VirtualStorage: "praefect",
}
+
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false))
+
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
event1, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob})
require.NoError(t, err)
require.Equal(t, uint64(1), event1.ID)
- // this job checks flow for replication event that fails
+ // this job checks flow for replication event that fails because of not existing source storage
failJob := okJob
- failJob.Change = "invalid-operation"
+ failJob.Change = datastore.UpdateRepo
+ failJob.SourceNodeStorage = "invalid"
event2, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob})
require.NoError(t, err)
require.Equal(t, uint64(2), event2.ID)
@@ -745,10 +759,6 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) {
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
- db := testdb.New(t)
- rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false))
-
replMgr := NewReplMgr(
logEntry,
conf.StorageNames(),
@@ -792,6 +802,8 @@ func TestProcessBacklog_Success(t *testing.T) {
func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
+ t.Cleanup(cancel)
+ db := testdb.New(t)
primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary"))
primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
@@ -825,7 +837,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
},
}
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
ackIDs, err := queue.Acknowledge(ctx, state, ids)
if len(ids) > 0 {
@@ -854,6 +866,9 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
},
}
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false))
+
_, err := queueInterceptor.Enqueue(ctx, eventType1)
require.NoError(t, err)
@@ -869,6 +884,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
// Rename replication job
eventType2 := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
+ RepositoryID: 1,
Change: datastore.RenameRepo,
RelativePath: testRepo.GetRelativePath(),
TargetNodeStorage: secondary.Storage,
@@ -884,6 +900,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
// Rename replication job
eventType3 := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
+ RepositoryID: 1,
Change: datastore.RenameRepo,
RelativePath: renameTo1,
TargetNodeStorage: secondary.Storage,
@@ -904,10 +921,6 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
- db := testdb.New(t)
- rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false))
-
replMgr := NewReplMgr(
logEntry,
conf.StorageNames(),
@@ -934,6 +947,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
t.Parallel()
+ db := testdb.New(t)
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
{
@@ -951,7 +965,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
var mtx sync.Mutex
expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true}
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) {
select {
case <-ctx.Done():
@@ -1012,6 +1026,8 @@ func (m mockReplicator) Replicate(ctx context.Context, event datastore.Replicati
func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(testhelper.Context(t))
+ t.Cleanup(cancel)
+ db := testdb.New(t)
const virtualStorage = "virtal-storage"
const primaryStorage = "storage-1"
@@ -1033,7 +1049,10 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
},
}
- queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false))
+
+ queue := datastore.NewPostgresReplicationEventQueue(db)
_, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RepositoryID: 1,
@@ -1046,10 +1065,6 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
})
require.NoError(t, err)
- db := testdb.New(t)
- rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false))
-
replMgr := NewReplMgr(
testhelper.NewDiscardingLogEntry(t),
conf.StorageNames(),
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index c3681a34c..4de7e0fc6 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -521,7 +521,14 @@ func TestRemoveRepository(t *testing.T) {
verifyReposExistence(t, codes.OK)
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository)
+ virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name
+
+ db := testdb.New(t)
+ repositoryStore := datastore.NewPostgresRepositoryStore(db, praefectCfg.StorageNames())
+ require.NoError(t, repositoryStore.CreateRepository(ctx, 1, virtualRepo.GetStorageName(), virtualRepo.GetRelativePath(), virtualRepo.GetRelativePath(), gitalyCfgs[0].Storages[0].Name, nil, nil, true, false))
+
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
repoStore := defaultRepoStore(praefectCfg)
txMgr := defaultTxMgr(praefectCfg)
nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), praefectCfg, nil,
@@ -541,15 +548,15 @@ func TestRemoveRepository(t *testing.T) {
GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
return relativePath, nil, nil
},
+ GetRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) {
+ return repositoryStore.GetRepositoryID(ctx, virtualStorage, relativePath)
+ },
},
withNodeMgr: nodeMgr,
withTxMgr: txMgr,
})
defer cleanup()
- virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository)
- virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name
-
_, err = gitalypb.NewRepositoryServiceClient(cc).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
Repository: virtualRepo,
})
@@ -616,30 +623,28 @@ func TestRenameRepository(t *testing.T) {
repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath)
}
- evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
-
- tx := testdb.New(t).Begin(t)
- defer tx.Rollback(t)
+ db := testdb.New(t)
+ evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
- rs := datastore.NewPostgresRepositoryStore(tx, nil)
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false))
nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil)
require.NoError(t, err)
defer nodeSet.Close()
- testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()})
+ testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()})
cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
withQueue: evq,
withRepoStore: rs,
withRouter: NewPerRepositoryRouter(
nodeSet.Connections(),
- nodes.NewPerRepositoryElector(tx),
+ nodes.NewPerRepositoryElector(db),
StaticHealthChecker(praefectCfg.StorageNames()),
NewLockedRandom(rand.New(rand.NewSource(0))),
rs,
- datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()),
+ datastore.NewAssignmentStore(db, praefectCfg.StorageNames()),
rs,
nil,
),