Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/praefect/coordinator_pg_test.go17
-rw-r--r--internal/praefect/coordinator_test.go25
-rw-r--r--internal/praefect/replicator_test.go73
-rw-r--r--internal/praefect/server_test.go29
4 files changed, 84 insertions, 60 deletions
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index e35da6b5d..c13fe0570 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -191,18 +191,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
txMgr := transactions.NewManager(conf)
- tx := db.Begin(t)
- defer tx.Rollback(t)
-
// set up the generations prior to transaction
- rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames())
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
repoCreated := false
for i, n := range tc.nodes {
- if n.generation == datastore.GenerationUnknown {
- continue
- }
-
if !repoCreated {
repoCreated = true
require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false))
@@ -211,7 +204,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, repo.RelativePath, n.generation))
}
- testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()})
+ testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": conf.StorageNames()})
nodeSet, err := DialNodes(
ctx,
@@ -229,11 +222,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
rs,
NewPerRepositoryRouter(
nodeSet.Connections(),
- nodes.NewPerRepositoryElector(tx),
+ nodes.NewPerRepositoryElector(db),
StaticHealthChecker(conf.StorageNames()),
NewLockedRandom(rand.New(rand.NewSource(0))),
rs,
- datastore.NewAssignmentStore(tx, conf.StorageNames()),
+ datastore.NewAssignmentStore(db, conf.StorageNames()),
rs,
nil,
),
@@ -315,8 +308,6 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
require.Equal(t, n.expectedGeneration, gen, "node %d has wrong generation", i)
}
- tx.Commit(t)
-
replicationWaitGroup.Wait()
for i, node := range tc.nodes {
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 4749b3acb..0bb224d3d 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -189,7 +189,7 @@ func TestStreamDirectorMutator(t *testing.T) {
}
testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()})
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created")
return queue.Enqueue(ctx, event)
@@ -322,8 +322,13 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
txMgr := transactions.NewManager(conf)
+ db := testdb.New(t)
+ require.NoError(t, datastore.NewPostgresRepositoryStore(db, conf.StorageNames()).
+ CreateRepository(ctx, 1, repo.GetStorageName(), repo.GetRelativePath(), repo.GetRelativePath(), "primary", []string{"secondary"}, nil, true, false),
+ )
+
coordinator := NewCoordinator(
- datastore.NewPostgresReplicationEventQueue(testdb.New(t)),
+ datastore.NewPostgresReplicationEventQueue(db),
rs,
NewNodeManagerRouter(nodeMgr, rs),
txMgr,
@@ -869,7 +874,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
assert.Equal(t, []string{unhealthySecondaryNode.Storage}, outdatedSecondaries)
assert.Equal(t, tc.primaryStored, storePrimary)
assert.Equal(t, tc.assignmentsStored, storeAssignments)
- return nil
+ return datastore.NewPostgresRepositoryStore(db, conf.StorageNames()).CreateRepository(ctx, repoID, virtualStorage, relativePath, replicaPath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments)
},
}
@@ -1078,7 +1083,8 @@ func TestAbsentCorrelationID(t *testing.T) {
},
}
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ db := testdb.New(t)
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created")
return queue.Enqueue(ctx, event)
@@ -1097,7 +1103,7 @@ func TestAbsentCorrelationID(t *testing.T) {
defer nodeMgr.Stop()
txMgr := transactions.NewManager(conf)
- rs := datastore.MockRepositoryStore{}
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
coordinator := NewCoordinator(
queueInterceptor,
@@ -1553,7 +1559,14 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
})
}
+ db := testdb.New(t)
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+ require.NoError(t, datastore.NewPostgresRepositoryStore(db, praefectConfig.StorageNames()).
+ CreateRepository(ctx, 1, testhelper.DefaultStorageName, repoProto.GetRelativePath(), repoProto.GetRelativePath(), repoProto.GetStorageName(), nil, nil, true, false),
+ )
+
praefectConn, _, cleanup := runPraefectServer(t, ctx, praefectConfig, buildOptions{
+ withQueue: queue,
// Set up a mock manager which sets up primary/secondaries and pretends that all nodes are
// healthy. We need fixed roles and unhealthy nodes will not take part in transactions.
withNodeMgr: &nodes.MockManager{
@@ -1569,7 +1582,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
}, nil
},
},
- // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent
+ // Set up a mock repository store pretending that all nodes are consistent. Only consistent
// nodes will take part in transactions.
withRepoStore: datastore.MockRepositoryStore{
GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) {
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 7cd48ff3a..4018b51bd 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -56,6 +56,8 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
}
func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
+ db := testdb.New(t)
+
primaryCfg, testRepoProto, testRepoPath := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary"))
testRepo := localrepo.NewTestRepo(t, primaryCfg, testRepoProto)
primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
@@ -155,7 +157,10 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
logger := testhelper.NewDiscardingLogger(t)
loggerHook := test.NewLocal(logger)
- queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false))
+
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
cancel() // when it is called we know that replication is finished
return queue.Acknowledge(ctx, state, ids)
@@ -165,10 +170,6 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
_, err = queue.Enqueue(ctx, events[0])
require.NoError(t, err)
- db := testdb.New(t)
- rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false))
-
replMgr := NewReplMgr(
loggerEntry,
conf.StorageNames(),
@@ -282,6 +283,8 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
func TestReplicator_PropagateReplicationJob(t *testing.T) {
t.Parallel()
+ db := testdb.New(t)
+
primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1"
primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage))
@@ -315,7 +318,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
// unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty
// By using WaitGroup we are sure the test cleanup will be started after all replication
// requests are completed, so no running cache IO operations happen.
- queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
var wg sync.WaitGroup
queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
wg.Add(1)
@@ -375,6 +378,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
StorageName: conf.VirtualStorages[0].Name,
RelativePath: repositoryRelativePath,
}
+ // We need to create repository record in the database before we can schedule replication events
+ repoStore := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, repoStore.CreateRepository(ctx, 1, conf.VirtualStorages[0].Name, repository.GetRelativePath(), repository.GetRelativePath(), repository.GetStorageName(), nil, nil, true, false))
//nolint:staticcheck
_, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{
@@ -685,6 +691,10 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
}
func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) {
+ ctx, cancel := context.WithCancel(ctx)
+ t.Cleanup(cancel)
+ db := testdb.New(t)
+
primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("default"))
primaryAddr := testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
@@ -714,26 +724,30 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) {
},
},
}
- ctx, cancel := context.WithCancel(ctx)
-
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
// this job exists to verify that replication works
okJob := datastore.ReplicationJob{
- RepositoryID: 1,
- Change: datastore.UpdateRepo,
+ RepositoryID: 1,
+ // The optimize_repository is used because we can't use update type.
+ // All update events won't be picked as they will be marked as done once the first is over.
+ Change: datastore.OptimizeRepository,
RelativePath: testRepo.RelativePath,
TargetNodeStorage: secondary.Storage,
- SourceNodeStorage: primary.Storage,
VirtualStorage: "praefect",
}
+
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false))
+
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
event1, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob})
require.NoError(t, err)
require.Equal(t, uint64(1), event1.ID)
- // this job checks flow for replication event that fails
+ // this job checks flow for replication event that fails because of not existing source storage
failJob := okJob
- failJob.Change = "invalid-operation"
+ failJob.Change = datastore.UpdateRepo
+ failJob.SourceNodeStorage = "invalid"
event2, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob})
require.NoError(t, err)
require.Equal(t, uint64(2), event2.ID)
@@ -745,10 +759,6 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) {
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
- db := testdb.New(t)
- rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false))
-
replMgr := NewReplMgr(
logEntry,
conf.StorageNames(),
@@ -792,6 +802,8 @@ func TestProcessBacklog_Success(t *testing.T) {
func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
+ t.Cleanup(cancel)
+ db := testdb.New(t)
primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary"))
primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
@@ -825,7 +837,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
},
}
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
ackIDs, err := queue.Acknowledge(ctx, state, ids)
if len(ids) > 0 {
@@ -854,6 +866,9 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
},
}
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false))
+
_, err := queueInterceptor.Enqueue(ctx, eventType1)
require.NoError(t, err)
@@ -869,6 +884,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
// Rename replication job
eventType2 := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
+ RepositoryID: 1,
Change: datastore.RenameRepo,
RelativePath: testRepo.GetRelativePath(),
TargetNodeStorage: secondary.Storage,
@@ -884,6 +900,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
// Rename replication job
eventType3 := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
+ RepositoryID: 1,
Change: datastore.RenameRepo,
RelativePath: renameTo1,
TargetNodeStorage: secondary.Storage,
@@ -904,10 +921,6 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
- db := testdb.New(t)
- rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false))
-
replMgr := NewReplMgr(
logEntry,
conf.StorageNames(),
@@ -934,6 +947,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) {
func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
t.Parallel()
+ db := testdb.New(t)
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
{
@@ -951,7 +965,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
var mtx sync.Mutex
expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true}
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) {
select {
case <-ctx.Done():
@@ -1012,6 +1026,8 @@ func (m mockReplicator) Replicate(ctx context.Context, event datastore.Replicati
func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(testhelper.Context(t))
+ t.Cleanup(cancel)
+ db := testdb.New(t)
const virtualStorage = "virtal-storage"
const primaryStorage = "storage-1"
@@ -1033,7 +1049,10 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
},
}
- queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false))
+
+ queue := datastore.NewPostgresReplicationEventQueue(db)
_, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RepositoryID: 1,
@@ -1046,10 +1065,6 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
})
require.NoError(t, err)
- db := testdb.New(t)
- rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false))
-
replMgr := NewReplMgr(
testhelper.NewDiscardingLogEntry(t),
conf.StorageNames(),
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index c3681a34c..4de7e0fc6 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -521,7 +521,14 @@ func TestRemoveRepository(t *testing.T) {
verifyReposExistence(t, codes.OK)
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+ virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository)
+ virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name
+
+ db := testdb.New(t)
+ repositoryStore := datastore.NewPostgresRepositoryStore(db, praefectCfg.StorageNames())
+ require.NoError(t, repositoryStore.CreateRepository(ctx, 1, virtualRepo.GetStorageName(), virtualRepo.GetRelativePath(), virtualRepo.GetRelativePath(), gitalyCfgs[0].Storages[0].Name, nil, nil, true, false))
+
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
repoStore := defaultRepoStore(praefectCfg)
txMgr := defaultTxMgr(praefectCfg)
nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), praefectCfg, nil,
@@ -541,15 +548,15 @@ func TestRemoveRepository(t *testing.T) {
GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
return relativePath, nil, nil
},
+ GetRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) {
+ return repositoryStore.GetRepositoryID(ctx, virtualStorage, relativePath)
+ },
},
withNodeMgr: nodeMgr,
withTxMgr: txMgr,
})
defer cleanup()
- virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository)
- virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name
-
_, err = gitalypb.NewRepositoryServiceClient(cc).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
Repository: virtualRepo,
})
@@ -616,30 +623,28 @@ func TestRenameRepository(t *testing.T) {
repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath)
}
- evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
-
- tx := testdb.New(t).Begin(t)
- defer tx.Rollback(t)
+ db := testdb.New(t)
+ evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
- rs := datastore.NewPostgresRepositoryStore(tx, nil)
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false))
nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil)
require.NoError(t, err)
defer nodeSet.Close()
- testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()})
+ testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()})
cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
withQueue: evq,
withRepoStore: rs,
withRouter: NewPerRepositoryRouter(
nodeSet.Connections(),
- nodes.NewPerRepositoryElector(tx),
+ nodes.NewPerRepositoryElector(db),
StaticHealthChecker(praefectCfg.StorageNames()),
NewLockedRandom(rand.New(rand.NewSource(0))),
rs,
- datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()),
+ datastore.NewAssignmentStore(db, praefectCfg.StorageNames()),
rs,
nil,
),