diff options
-rw-r--r-- | internal/praefect/coordinator_pg_test.go | 17 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 25 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 73 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 29 |
4 files changed, 84 insertions, 60 deletions
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index e35da6b5d..c13fe0570 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -191,18 +191,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { txMgr := transactions.NewManager(conf) - tx := db.Begin(t) - defer tx.Rollback(t) - // set up the generations prior to transaction - rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) repoCreated := false for i, n := range tc.nodes { - if n.generation == datastore.GenerationUnknown { - continue - } - if !repoCreated { repoCreated = true require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false)) @@ -211,7 +204,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, repo.RelativePath, n.generation)) } - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) + testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": conf.StorageNames()}) nodeSet, err := DialNodes( ctx, @@ -229,11 +222,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { rs, NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(db), StaticHealthChecker(conf.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, - datastore.NewAssignmentStore(tx, conf.StorageNames()), + datastore.NewAssignmentStore(db, conf.StorageNames()), rs, nil, ), @@ -315,8 +308,6 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { require.Equal(t, n.expectedGeneration, gen, "node %d has wrong generation", i) } - tx.Commit(t) - replicationWaitGroup.Wait() for i, node := range tc.nodes { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 4749b3acb..0bb224d3d 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -189,7 +189,7 @@ func TestStreamDirectorMutator(t *testing.T) { } testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -322,8 +322,13 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { txMgr := transactions.NewManager(conf) + db := testdb.New(t) + require.NoError(t, datastore.NewPostgresRepositoryStore(db, conf.StorageNames()). + CreateRepository(ctx, 1, repo.GetStorageName(), repo.GetRelativePath(), repo.GetRelativePath(), "primary", []string{"secondary"}, nil, true, false), + ) + coordinator := NewCoordinator( - datastore.NewPostgresReplicationEventQueue(testdb.New(t)), + datastore.NewPostgresReplicationEventQueue(db), rs, NewNodeManagerRouter(nodeMgr, rs), txMgr, @@ -869,7 +874,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { assert.Equal(t, []string{unhealthySecondaryNode.Storage}, outdatedSecondaries) assert.Equal(t, tc.primaryStored, storePrimary) assert.Equal(t, tc.assignmentsStored, storeAssignments) - return nil + return datastore.NewPostgresRepositoryStore(db, conf.StorageNames()).CreateRepository(ctx, repoID, virtualStorage, relativePath, replicaPath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) }, } @@ -1078,7 +1083,8 @@ func TestAbsentCorrelationID(t *testing.T) { }, } - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + db := testdb.New(t) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -1097,7 +1103,7 @@ func TestAbsentCorrelationID(t *testing.T) { defer nodeMgr.Stop() txMgr := transactions.NewManager(conf) - rs := datastore.MockRepositoryStore{} + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) coordinator := NewCoordinator( queueInterceptor, @@ -1553,7 +1559,14 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }) } + db := testdb.New(t) + queue := datastore.NewPostgresReplicationEventQueue(db) + require.NoError(t, datastore.NewPostgresRepositoryStore(db, praefectConfig.StorageNames()). + CreateRepository(ctx, 1, testhelper.DefaultStorageName, repoProto.GetRelativePath(), repoProto.GetRelativePath(), repoProto.GetStorageName(), nil, nil, true, false), + ) + praefectConn, _, cleanup := runPraefectServer(t, ctx, praefectConfig, buildOptions{ + withQueue: queue, // Set up a mock manager which sets up primary/secondaries and pretends that all nodes are // healthy. We need fixed roles and unhealthy nodes will not take part in transactions. withNodeMgr: &nodes.MockManager{ @@ -1569,7 +1582,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }, nil }, }, - // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent + // Set up a mock repository store pretending that all nodes are consistent. Only consistent // nodes will take part in transactions. withRepoStore: datastore.MockRepositoryStore{ GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 7cd48ff3a..4018b51bd 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -56,6 +56,8 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { } func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { + db := testdb.New(t) + primaryCfg, testRepoProto, testRepoPath := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary")) testRepo := localrepo.NewTestRepo(t, primaryCfg, testRepoProto) primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -155,7 +157,10 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { logger := testhelper.NewDiscardingLogger(t) loggerHook := test.NewLocal(logger) - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false)) + + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { cancel() // when it is called we know that replication is finished return queue.Acknowledge(ctx, state, ids) @@ -165,10 +170,6 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { _, err = queue.Enqueue(ctx, events[0]) require.NoError(t, err) - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false)) - replMgr := NewReplMgr( loggerEntry, conf.StorageNames(), @@ -282,6 +283,8 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { func TestReplicator_PropagateReplicationJob(t *testing.T) { t.Parallel() + db := testdb.New(t) + primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1" primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage)) @@ -315,7 +318,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty // By using WaitGroup we are sure the test cleanup will be started after all replication // requests are completed, so no running cache IO operations happen. - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) var wg sync.WaitGroup queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { wg.Add(1) @@ -375,6 +378,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { StorageName: conf.VirtualStorages[0].Name, RelativePath: repositoryRelativePath, } + // We need to create repository record in the database before we can schedule replication events + repoStore := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, repoStore.CreateRepository(ctx, 1, conf.VirtualStorages[0].Name, repository.GetRelativePath(), repository.GetRelativePath(), repository.GetStorageName(), nil, nil, true, false)) //nolint:staticcheck _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ @@ -685,6 +691,10 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { } func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + db := testdb.New(t) + primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("default")) primaryAddr := testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -714,26 +724,30 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { }, }, } - ctx, cancel := context.WithCancel(ctx) - - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) // this job exists to verify that replication works okJob := datastore.ReplicationJob{ - RepositoryID: 1, - Change: datastore.UpdateRepo, + RepositoryID: 1, + // The optimize_repository is used because we can't use update type. + // All update events won't be picked as they will be marked as done once the first is over. + Change: datastore.OptimizeRepository, RelativePath: testRepo.RelativePath, TargetNodeStorage: secondary.Storage, - SourceNodeStorage: primary.Storage, VirtualStorage: "praefect", } + + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false)) + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) event1, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob}) require.NoError(t, err) require.Equal(t, uint64(1), event1.ID) - // this job checks flow for replication event that fails + // this job checks flow for replication event that fails because of not existing source storage failJob := okJob - failJob.Change = "invalid-operation" + failJob.Change = datastore.UpdateRepo + failJob.SourceNodeStorage = "invalid" event2, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob}) require.NoError(t, err) require.Equal(t, uint64(2), event2.ID) @@ -745,10 +759,6 @@ func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false)) - replMgr := NewReplMgr( logEntry, conf.StorageNames(), @@ -792,6 +802,8 @@ func TestProcessBacklog_Success(t *testing.T) { func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + db := testdb.New(t) primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary")) primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -825,7 +837,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { }, } - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { ackIDs, err := queue.Acknowledge(ctx, state, ids) if len(ids) > 0 { @@ -854,6 +866,9 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { }, } + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false)) + _, err := queueInterceptor.Enqueue(ctx, eventType1) require.NoError(t, err) @@ -869,6 +884,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { // Rename replication job eventType2 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.RenameRepo, RelativePath: testRepo.GetRelativePath(), TargetNodeStorage: secondary.Storage, @@ -884,6 +900,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { // Rename replication job eventType3 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.RenameRepo, RelativePath: renameTo1, TargetNodeStorage: secondary.Storage, @@ -904,10 +921,6 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false)) - replMgr := NewReplMgr( logEntry, conf.StorageNames(), @@ -934,6 +947,7 @@ func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { t.Parallel() + db := testdb.New(t) conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ { @@ -951,7 +965,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { var mtx sync.Mutex expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true} - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) { select { case <-ctx.Done(): @@ -1012,6 +1026,8 @@ func (m mockReplicator) Replicate(ctx context.Context, event datastore.Replicati func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(testhelper.Context(t)) + t.Cleanup(cancel) + db := testdb.New(t) const virtualStorage = "virtal-storage" const primaryStorage = "storage-1" @@ -1033,7 +1049,10 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }, } - queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false)) + + queue := datastore.NewPostgresReplicationEventQueue(db) _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RepositoryID: 1, @@ -1046,10 +1065,6 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }) require.NoError(t, err) - db := testdb.New(t) - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false)) - replMgr := NewReplMgr( testhelper.NewDiscardingLogEntry(t), conf.StorageNames(), diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c3681a34c..4de7e0fc6 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -521,7 +521,14 @@ func TestRemoveRepository(t *testing.T) { verifyReposExistence(t, codes.OK) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository) + virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name + + db := testdb.New(t) + repositoryStore := datastore.NewPostgresRepositoryStore(db, praefectCfg.StorageNames()) + require.NoError(t, repositoryStore.CreateRepository(ctx, 1, virtualRepo.GetStorageName(), virtualRepo.GetRelativePath(), virtualRepo.GetRelativePath(), gitalyCfgs[0].Storages[0].Name, nil, nil, true, false)) + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) repoStore := defaultRepoStore(praefectCfg) txMgr := defaultTxMgr(praefectCfg) nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), praefectCfg, nil, @@ -541,15 +548,15 @@ func TestRemoveRepository(t *testing.T) { GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { return relativePath, nil, nil }, + GetRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + return repositoryStore.GetRepositoryID(ctx, virtualStorage, relativePath) + }, }, withNodeMgr: nodeMgr, withTxMgr: txMgr, }) defer cleanup() - virtualRepo := proto.Clone(repos[0][0]).(*gitalypb.Repository) - virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name - _, err = gitalypb.NewRepositoryServiceClient(cc).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ Repository: virtualRepo, }) @@ -616,30 +623,28 @@ func TestRenameRepository(t *testing.T) { repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath) } - evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) - - tx := testdb.New(t).Begin(t) - defer tx.Rollback(t) + db := testdb.New(t) + evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) - rs := datastore.NewPostgresRepositoryStore(tx, nil) + rs := datastore.NewPostgresRepositoryStore(db, nil) require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil) require.NoError(t, err) defer nodeSet.Close() - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) + testdb.SetHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ withQueue: evq, withRepoStore: rs, withRouter: NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(db), StaticHealthChecker(praefectCfg.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, - datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()), + datastore.NewAssignmentStore(db, praefectCfg.StorageNames()), rs, nil, ), |