diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2022-02-18 13:41:33 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2022-02-23 14:00:21 +0300 |
commit | bdd73865268ec2d237bf4cb6694c97e91bab8761 (patch) | |
tree | 4880d06c42cc8db3d9dbf88b7da46971d1f7bd0f | |
parent | c61dedb0af342fb1f21a2fa994aad9fb9873587c (diff) |
praefect: Repository should exist before scheduling eventsps-clean-queue-on-repo-removal
After introduction of the repository ID we use it
for various operations where the relative path and
virtual storage were used. One of the use cases is
creation of the replication events. The injected
repository ID is used to identify concrete repository
later. Because now we have a foreign key we are not
allowed to create replication events for repositories
that are not exist yet in the database. That is why
we use PostgresRepositoryStore to create a repository
records before we enqueue any replication events.
The change also contains fixes of some inconsistencies
in setup logic as double-database creation, unneeded
transactions usage.
-rw-r--r-- | internal/praefect/coordinator_pg_test.go | 17 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 25 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 73 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 29 |
4 files changed, 84 insertions, 60 deletions
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index e35da6b5d..c13fe0570 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -191,18 +191,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { txMgr := transactions.NewManager(conf) - tx := db.Begin(t) - defer tx.Rollback(t) - // set up the generations prior to transaction - rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) repoCreated := false for i, n := range tc.nodes { - if n.generation == datastore.GenerationUnknown { - continue - } - if !repoCreated { repoCreated = true require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false)) @@ -211,7 +204,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, repo.RelativePath, n.generation)) } - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) + testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": conf.StorageNames()}) nodeSet, err := DialNodes( ctx, @@ -229,11 +222,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { rs, NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(db), StaticHealthChecker(conf.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, - datastore.NewAssignmentStore(tx, conf.StorageNames()), + datastore.NewAssignmentStore(db, conf.StorageNames()), rs, nil, ), @@ -315,8 +308,6 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { require.Equal(t, n.expectedGeneration, gen, "node %d has wrong generation", i) } - tx.Commit(t) - replicationWaitGroup.Wait() for i, node := range tc.nodes { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 4749b3acb..0bb224d3d 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -189,7 +189,7 @@ func TestStreamDirectorMutator(t *testing.T) { } testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -322,8 +322,13 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { txMgr := transactions.NewManager(conf) + db := testdb.New(t) + require.NoError(t, datastore.NewPostgresRepositoryStore(db, conf.StorageNames()). + CreateRepository(ctx, 1, repo.GetStorageName(), repo.GetRelativePath(), repo.GetRelativePath(), "primary", []string{"secondary"}, nil, true, false), + ) + coordinator := NewCoordinator( - datastore.NewPostgresReplicationEventQueue(testdb.New(t)), + datastore.NewPostgresReplicationEventQueue(db), rs, NewNodeManagerRouter(nodeMgr, rs), txMgr, @@ -869,7 +874,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { assert.Equal(t, []string{unhealthySecondaryNode.Storage}, outdatedSecondaries) assert.Equal(t, tc.primaryStored, storePrimary) assert.Equal(t, tc.assignmentsStored, storeAssignments) - return nil + return datastore.NewPostgresRepositoryStore(db, conf.StorageNames()).CreateRepository(ctx, repoID, virtualStorage, relativePath, replicaPath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) }, } @@ -1078,7 +1083,8 @@ func TestAbsentCorrelationID(t *testing.T) { }, } - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + db := testdb.New(t) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -1097,7 +1103,7 @@ func TestAbsentCorrelationID(t *testing.T) { defer nodeMgr.Stop() txMgr := transactions.NewManager(conf) - rs := datastore.MockRepositoryStore{} + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) coordinator := NewCoordinator( queueInterceptor, @@ -1553,7 +1559,14 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }) } + db := testdb.New(t) + queue := datastore.NewPostgresReplicationEventQueue(db) + require.NoError(t, datastore.NewPostgresRepositoryStore(db, praefectConfig.StorageNames()). + CreateRepository(ctx, 1, testhelper.DefaultStorageName, repoProto.GetRelativePath(), repoProto.GetRelativePath(), repoProto.GetStorageName(), nil, nil, true, false), + ) + praefectConn, _, cleanup := runPraefectServer(t, ctx, praefectConfig, buildOptions{ + withQueue: queue, // Set up a mock manager which sets up primary/secondaries and pretends that all nodes are // healthy. We need fixed roles and unhealthy nodes will not take part in transactions. withNodeMgr: &nodes.MockManager{ @@ -1569,7 +1582,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }, nil }, }, - // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent + // Set up a mock repository store pretending that all nodes are consistent. Only consistent // nodes will take part in transactions. withRepoStore: datastore.MockRepositoryStore{ GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 7cd48ff3a..4018b51bd 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -56,6 +56,8 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { } func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { + db := testdb.New(t) + primaryCfg, testRepoProto, testRepoPath := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary")) testRepo := localrepo.NewTestRepo(t, primaryCfg, testRepoProto) primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -155,7 +157,10 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { logger := testhelper.NewDiscardingLogger(t) loggerHook := test.NewLocal(logger) - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false)) + + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { cancel() // when it is called we know that replication is finished return queue.Acknowledge(ctx, state, ids) @@ -165,10 +170,6 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { _, err = queue.Enqueue(ctx, events[0]) require.NoError(t, err) - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false)) - replMgr := NewReplMgr( loggerEntry, conf.StorageNames(), @@ -282,6 +283,8 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { func TestReplicator_PropagateReplicationJob(t *testing.T) { t.Parallel() + db := testdb.New(t) + primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1" primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage)) @@ -315,7 +318,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty // By using WaitGroup we are sure the test cleanup will be started after all replication // requests are completed, so no running cache IO operations happen. - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) var wg sync.WaitGroup queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { wg.Add(1) @@ -375,6 +378,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { StorageName: conf.VirtualStorages[0].Name, RelativePath: repositoryRelativePath, } + // We need to create repository record in the database before we can schedule replication events + repoStore := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, repoStore.CreateRepository(ctx, 1, conf.VirtualStorages[0].Name, repository.GetRelativePath(), repository.GetRelativePath(), repository.GetStorageName(), nil, nil, true, false)) //nolint:staticcheck _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ @@ -685,6 +691,10 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { } func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + db := testdb.New(t) + primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("default")) primaryAddr := testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -714,26 +724,30 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { }, }, } - ctx, cancel := context.WithCancel(ctx) - - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) // this job exists to verify that replication works okJob := datastore.ReplicationJob{ - RepositoryID: 1, - Change: datastore.UpdateRepo, + RepositoryID: 1, + // The optimize_repository is used because we can't use update type. + // All update events won't be picked as they will be marked as done once the first is over. + Change: datastore.OptimizeRepository, RelativePath: testRepo.RelativePath, TargetNodeStorage: secondary.Storage, - SourceNodeStorage: primary.Storage, VirtualStorage: "praefect", } + + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false)) + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) event1, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob}) require.NoError(t, err) require.Equal(t, uint64(1), event1.ID) - // this job checks flow for replication event that fails + // this job checks flow for replication event that fails because of not existing source storage failJob := okJob - failJob.Change = "invalid-operation" + failJob.Change = datastore.UpdateRepo + failJob.SourceNodeStorage = "invalid" event2, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob}) require.NoError(t, err) require.Equal(t, uint64(2), event2.ID) @@ -745,10 +759,6 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false)) - replMgr := NewReplMgr( logEntry, conf.StorageNames(), @@ -792,6 +802,8 @@ func TestProcessBacklog_Success(t *testing.T) { func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + db := testdb.New(t) primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary")) primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -825,7 +837,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { }, } - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { ackIDs, err := queue.Acknowledge(ctx, state, ids) if len(ids) > 0 { @@ -854,6 +866,9 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { }, } + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false)) + _, err := queueInterceptor.Enqueue(ctx, eventType1) require.NoError(t, err) @@ -869,6 +884,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { // Rename replication job eventType2 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.RenameRepo, RelativePath: testRepo.GetRelativePath(), TargetNodeStorage: secondary.Storage, @@ -884,6 +900,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { // Rename replication job eventType3 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.RenameRepo, RelativePath: renameTo1, TargetNodeStorage: secondary.Storage, @@ -904,10 +921,6 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false)) - replMgr := NewReplMgr( logEntry, conf.StorageNames(), @@ -934,6 +947,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { t.Parallel() + db := testdb.New(t) conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ { @@ -951,7 +965,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { var mtx sync.Mutex expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true} - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) { select { case <-ctx.Done(): @@ -1012,6 +1026,8 @@ func (m mockReplicator) Replicate(ctx context.Context, event datastore.Replicati func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(testhelper.Context(t)) + t.Cleanup(cancel) + db := testdb.New(t) const virtualStorage = "virtal-storage" const primaryStorage = "storage-1" @@ -1033,7 +1049,10 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }, } - queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false)) + + queue := datastore.NewPostgresReplicationEventQueue(db) _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RepositoryID: 1, @@ -1046,10 +1065,6 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }) require.NoError(t, err) - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false)) - replMgr := NewReplMgr( testhelper.NewDiscardingLogEntry(t), conf.StorageNames(), diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c3681a34c..4de7e0fc6 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -521,7 +521,14 @@ func TestRemoveRepository(t *testing.T) { verifyReposExistence(t, codes.OK) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository) + virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name + + db := testdb.New(t) + repositoryStore := datastore.NewPostgresRepositoryStore(db, praefectCfg.StorageNames()) + require.NoError(t, repositoryStore.CreateRepository(ctx, 1, virtualRepo.GetStorageName(), virtualRepo.GetRelativePath(), virtualRepo.GetRelativePath(), gitalyCfgs[0].Storages[0].Name, nil, nil, true, false)) + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) repoStore := defaultRepoStore(praefectCfg) txMgr := defaultTxMgr(praefectCfg) nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), praefectCfg, nil, @@ -541,15 +548,15 @@ func TestRemoveRepository(t *testing.T) { GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { return relativePath, nil, nil }, + GetRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + return repositoryStore.GetRepositoryID(ctx, virtualStorage, relativePath) + }, }, withNodeMgr: nodeMgr, withTxMgr: txMgr, }) defer cleanup() - virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository) - virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name - _, err = gitalypb.NewRepositoryServiceClient(cc).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ Repository: virtualRepo, }) @@ -616,30 +623,28 @@ func TestRenameRepository(t *testing.T) { repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath) } - evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) - - tx := testdb.New(t).Begin(t) - defer tx.Rollback(t) + db := testdb.New(t) + evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) - rs := datastore.NewPostgresRepositoryStore(tx, nil) + rs := datastore.NewPostgresRepositoryStore(db, nil) require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil) require.NoError(t, err) defer nodeSet.Close() - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) + testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ withQueue: evq, withRepoStore: rs, withRouter: NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(db), StaticHealthChecker(praefectCfg.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, - datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()), + datastore.NewAssignmentStore(db, praefectCfg.StorageNames()), rs, nil, ), |