diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-04-25 13:13:21 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-04-27 10:15:46 +0300 |
commit | 64d9727a224d703da8d1afbabfd95573af6ef212 (patch) | |
tree | e1e1a520e421a008bafdaa61cd2d275a00126af1 | |
parent | 813f937448f3e5bf859a137bb487828db9085213 (diff) |
replicator: Refactor tests exercising replication of maintenance RPCspks-praefect-remove-maintenance-event-creation
The replicator used to host logic to replicate maintenance-style RPCs,
and thus it also had some tests which exercised this logic. Now that
replication of those RPCs has been removed in favor of a best-effort
routing strategy, we're essentially not testing the replicator anymore
though, but the coordinator.
Move the test into the coordinator's test suite. While at it, refactor
it to be table-driven so that it's more readable and convert the setup
to use `runPraefectServer()`.
-rw-r--r-- | internal/praefect/coordinator_test.go | 289 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 345 |
2 files changed, 289 insertions, 345 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 3b4d53d15..fce4d5e5b 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -19,6 +19,8 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/cache" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + gconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" gitaly_metadata "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" @@ -549,6 +551,293 @@ func TestStreamDirector_maintenance(t *testing.T) { } } +func TestStreamDirector_maintenanceRPCs(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + primaryStorage := "internal-gitaly-0" + primaryServer, primarySocketPath := runMockMaintenanceServer( + t, testcfg.Build(t, testcfg.WithStorages(primaryStorage)), + ) + + secondaryStorage := "internal-gitaly-1" + secondaryServer, secondarySocketPath := runMockMaintenanceServer(t, testcfg.Build( + t, testcfg.WithStorages(secondaryStorage)), + ) + + cc, _, cleanup := runPraefectServer(t, ctx, config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*config.Node{ + { + Storage: primaryStorage, + Address: primarySocketPath, + }, + { + Storage: secondaryStorage, + Address: secondarySocketPath, + }, + }, + }, + }, + }, buildOptions{}) + defer cleanup() + + repository := &gitalypb.Repository{ + StorageName: "default", + RelativePath: gittest.NewRepositoryName(t, true), + } + primaryRepository := &gitalypb.Repository{ + StorageName: primaryStorage, + RelativePath: repository.RelativePath, + } + secondaryRepository := &gitalypb.Repository{ + StorageName: secondaryStorage, + RelativePath: repository.RelativePath, + } + + repositoryClient := gitalypb.NewRepositoryServiceClient(cc) + refClient := gitalypb.NewRefServiceClient(cc) + + for _, tc := range []struct { + desc string + maintenanceFunc func(t *testing.T) + expectedPrimaryRequest proto.Message + expectedSecondaryRequest proto.Message + }{ + { + desc: "GarbageCollect", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ + Repository: repository, + CreateBitmap: true, + Prune: true, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.GarbageCollectRequest{ + Repository: primaryRepository, + CreateBitmap: true, + Prune: true, + }, + expectedSecondaryRequest: &gitalypb.GarbageCollectRequest{ + Repository: secondaryRepository, + CreateBitmap: true, + Prune: true, + }, + }, + { + desc: "RepackFull", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.RepackFull(ctx, &gitalypb.RepackFullRequest{ + Repository: repository, + CreateBitmap: true, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.RepackFullRequest{ + Repository: primaryRepository, + CreateBitmap: true, + }, + expectedSecondaryRequest: &gitalypb.RepackFullRequest{ + Repository: secondaryRepository, + CreateBitmap: true, + }, + }, + { + desc: "RepackIncremental", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.RepackIncrementalRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.RepackIncrementalRequest{ + Repository: secondaryRepository, + }, + }, + { + desc: "Cleanup", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.Cleanup(ctx, &gitalypb.CleanupRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.CleanupRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.CleanupRequest{ + Repository: secondaryRepository, + }, + }, + { + desc: "WriteCommitGraph", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.WriteCommitGraph(ctx, &gitalypb.WriteCommitGraphRequest{ + Repository: repository, + // This is not a valid split strategy, but we currently only support a + // single default split strategy with value 0. So we just test with an + // invalid split strategy to check that a non-default value gets properly + // replicated. + SplitStrategy: 1, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.WriteCommitGraphRequest{ + Repository: primaryRepository, + SplitStrategy: 1, + }, + expectedSecondaryRequest: &gitalypb.WriteCommitGraphRequest{ + Repository: secondaryRepository, + SplitStrategy: 1, + }, + }, + { + desc: "MidxRepack", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.MidxRepack(ctx, &gitalypb.MidxRepackRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.MidxRepackRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.MidxRepackRequest{ + Repository: secondaryRepository, + }, + }, + { + desc: "OptimizeRepository", + maintenanceFunc: func(t *testing.T) { + _, err := repositoryClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.OptimizeRepositoryRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.OptimizeRepositoryRequest{ + Repository: secondaryRepository, + }, + }, + { + desc: "PruneUnreachableObjects", + maintenanceFunc: func(t *testing.T) { + _, err := repositoryClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.PruneUnreachableObjectsRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.PruneUnreachableObjectsRequest{ + Repository: secondaryRepository, + }, + }, + { + desc: "PackRefs", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := refClient.PackRefs(ctx, &gitalypb.PackRefsRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.PackRefsRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.PackRefsRequest{ + Repository: secondaryRepository, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + tc.maintenanceFunc(t) + testhelper.ProtoEqual(t, tc.expectedPrimaryRequest, <-primaryServer.requestCh) + testhelper.ProtoEqual(t, tc.expectedSecondaryRequest, <-secondaryServer.requestCh) + }) + } +} + +type mockMaintenanceServer struct { + requestCh chan proto.Message + gitalypb.UnimplementedRepositoryServiceServer + gitalypb.UnimplementedRefServiceServer +} + +func runMockMaintenanceServer(t *testing.T, cfg gconfig.Cfg) (*mockMaintenanceServer, string) { + server := &mockMaintenanceServer{ + requestCh: make(chan proto.Message, 1), + } + + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRepositoryServiceServer(srv, server) + gitalypb.RegisterRefServiceServer(srv, server) + }, testserver.WithDisablePraefect()) + + return server, addr +} + +func (m *mockMaintenanceServer) GarbageCollect(ctx context.Context, in *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) { + m.requestCh <- in + return &gitalypb.GarbageCollectResponse{}, nil +} + +func (m *mockMaintenanceServer) RepackFull(ctx context.Context, in *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) { + m.requestCh <- in + return &gitalypb.RepackFullResponse{}, nil +} + +func (m *mockMaintenanceServer) RepackIncremental(ctx context.Context, in *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) { + m.requestCh <- in + return &gitalypb.RepackIncrementalResponse{}, nil +} + +func (m *mockMaintenanceServer) Cleanup(ctx context.Context, in *gitalypb.CleanupRequest) (*gitalypb.CleanupResponse, error) { + m.requestCh <- in + return &gitalypb.CleanupResponse{}, nil +} + +func (m *mockMaintenanceServer) WriteCommitGraph(ctx context.Context, in *gitalypb.WriteCommitGraphRequest) (*gitalypb.WriteCommitGraphResponse, error) { + m.requestCh <- in + return &gitalypb.WriteCommitGraphResponse{}, nil +} + +func (m *mockMaintenanceServer) MidxRepack(ctx context.Context, in *gitalypb.MidxRepackRequest) (*gitalypb.MidxRepackResponse, error) { + m.requestCh <- in + return &gitalypb.MidxRepackResponse{}, nil +} + +func (m *mockMaintenanceServer) OptimizeRepository(ctx context.Context, in *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) { + m.requestCh <- in + return &gitalypb.OptimizeRepositoryResponse{}, nil +} + +func (m *mockMaintenanceServer) PruneUnreachableObjects(ctx context.Context, in *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) { + m.requestCh <- in + return &gitalypb.PruneUnreachableObjectsResponse{}, nil +} + +func (m *mockMaintenanceServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) (*gitalypb.PackRefsResponse, error) { + m.requestCh <- in + return &gitalypb.PackRefsResponse{}, nil +} + type mockRouter struct { Router routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 89e5d342c..dd91d20b8 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -2,7 +2,6 @@ package praefect import ( "context" - "fmt" "path/filepath" "strings" "sync" @@ -22,7 +21,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v14/internal/git/objectpool" gconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" @@ -32,7 +30,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" @@ -279,348 +276,6 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { } } -func TestReplicator_PropagateReplicationJob(t *testing.T) { - t.Parallel() - - ctx := testhelper.Context(t) - primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1" - - primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage)) - primaryServer, primarySocketPath := runMockRepositoryServer(t, primCfg) - - secCfg := testcfg.Build(t, testcfg.WithStorages(secondaryStorage)) - secondaryServer, secondarySocketPath := runMockRepositoryServer(t, secCfg) - - conf := config.Config{ - VirtualStorages: []*config.VirtualStorage{ - { - Name: "default", - Nodes: []*config.Node{ - { - Storage: primaryStorage, - Address: primarySocketPath, - }, - { - Storage: secondaryStorage, - Address: secondarySocketPath, - }, - }, - }, - }, - } - - // We need to await for the replication event to make a complete roundtrip to the remote. - // Because send to the channel happens during in-flight request there are ongoing filesystem - // operations related to caching. The cleanup happens before all IO cache operations finished - // those resulting to: - // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty - // By using WaitGroup we are sure the test cleanup will be started after all replication - // requests are completed, so no running cache IO operations happen. - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) - - var wg sync.WaitGroup - // When maintenance operation routing is enabled we don't expect to see any - // replication events. The observed behaviour should still be the same though: we - // expect to observe the RPC calls on both the primary and secondary node because we - // route them to both at the same time. - queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { - require.FailNow(t, "no replication jobs should have been created") - return datastore.ReplicationEvent{}, fmt.Errorf("unexpected enqueue") - }) - - logEntry := testhelper.NewDiscardingLogEntry(t) - - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) - require.NoError(t, err) - nodeMgr.Start(0, time.Hour) - defer nodeMgr.Stop() - - txMgr := transactions.NewManager(conf) - - repositoryRelativePath := "/path/to/repo" - - rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return repositoryRelativePath, nil, nil - }, - GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { - return repositoryRelativePath, nil - }, - } - - coordinator := NewCoordinator( - queue, - rs, - NewNodeManagerRouter(nodeMgr, rs), - txMgr, - conf, - protoregistry.GitalyProtoPreregistered, - ) - - replmgr := NewReplMgr(logEntry, conf.StorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr)) - - prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, txMgr, rs, nil, nil, nil, nil) - - listener, port := listenAvailPort(t) - ctx, cancel := context.WithCancel(ctx) - - go prf.Serve(listener) - defer prf.Stop() - - cc := dialLocalPort(t, port, false) - repositoryClient := gitalypb.NewRepositoryServiceClient(cc) - refClient := gitalypb.NewRefServiceClient(cc) - defer listener.Close() - defer cc.Close() - - repository := &gitalypb.Repository{ - StorageName: conf.VirtualStorages[0].Name, - RelativePath: repositoryRelativePath, - } - - //nolint:staticcheck - _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ - Repository: repository, - CreateBitmap: true, - Prune: true, - }) - require.NoError(t, err) - - //nolint:staticcheck - _, err = repositoryClient.RepackFull(ctx, &gitalypb.RepackFullRequest{Repository: repository, CreateBitmap: false}) - require.NoError(t, err) - - //nolint:staticcheck - _, err = repositoryClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{Repository: repository}) - require.NoError(t, err) - - //nolint:staticcheck - _, err = repositoryClient.Cleanup(ctx, &gitalypb.CleanupRequest{Repository: repository}) - require.NoError(t, err) - - //nolint:staticcheck - _, err = repositoryClient.WriteCommitGraph(ctx, &gitalypb.WriteCommitGraphRequest{ - Repository: repository, - // This is not a valid split strategy, but we currently only support a - // single default split strategy with value 0. So we just test with an - // invalid split strategy to check that a non-default value gets properly - // replicated. - SplitStrategy: 1, - }) - require.NoError(t, err) - - //nolint:staticcheck - _, err = repositoryClient.MidxRepack(ctx, &gitalypb.MidxRepackRequest{Repository: repository}) - require.NoError(t, err) - - _, err = repositoryClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{Repository: repository}) - require.NoError(t, err) - - _, err = repositoryClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{Repository: repository}) - require.NoError(t, err) - - //nolint:staticcheck - _, err = refClient.PackRefs(ctx, &gitalypb.PackRefsRequest{ - Repository: repository, - }) - require.NoError(t, err) - - primaryRepository := &gitalypb.Repository{StorageName: primaryStorage, RelativePath: repositoryRelativePath} - expectedPrimaryGcReq := &gitalypb.GarbageCollectRequest{ - Repository: primaryRepository, - CreateBitmap: true, - Prune: true, - } - expectedPrimaryRepackFullReq := &gitalypb.RepackFullRequest{ - Repository: primaryRepository, - CreateBitmap: false, - } - expectedPrimaryRepackIncrementalReq := &gitalypb.RepackIncrementalRequest{ - Repository: primaryRepository, - } - expectedPrimaryCleanup := &gitalypb.CleanupRequest{ - Repository: primaryRepository, - } - expectedPrimaryWriteCommitGraph := &gitalypb.WriteCommitGraphRequest{ - Repository: primaryRepository, - SplitStrategy: 1, - } - expectedPrimaryMidxRepack := &gitalypb.MidxRepackRequest{ - Repository: primaryRepository, - } - expectedPrimaryOptimizeRepository := &gitalypb.OptimizeRepositoryRequest{ - Repository: primaryRepository, - } - expectedPruneUnreachableObjects := &gitalypb.PruneUnreachableObjectsRequest{ - Repository: primaryRepository, - } - expectedPrimaryPackRefs := &gitalypb.PackRefsRequest{ - Repository: primaryRepository, - } - - replMgrDone := startProcessBacklog(ctx, replmgr) - - // ensure primary gitaly server received the expected requests - waitForRequest(t, primaryServer.gcChan, expectedPrimaryGcReq, 5*time.Second) - waitForRequest(t, primaryServer.repackIncrChan, expectedPrimaryRepackIncrementalReq, 5*time.Second) - waitForRequest(t, primaryServer.repackFullChan, expectedPrimaryRepackFullReq, 5*time.Second) - waitForRequest(t, primaryServer.cleanupChan, expectedPrimaryCleanup, 5*time.Second) - waitForRequest(t, primaryServer.writeCommitGraphChan, expectedPrimaryWriteCommitGraph, 5*time.Second) - waitForRequest(t, primaryServer.midxRepackChan, expectedPrimaryMidxRepack, 5*time.Second) - waitForRequest(t, primaryServer.optimizeRepositoryChan, expectedPrimaryOptimizeRepository, 5*time.Second) - waitForRequest(t, primaryServer.pruneUnreachableObjectsChan, expectedPruneUnreachableObjects, 5*time.Second) - waitForRequest(t, primaryServer.packRefsChan, expectedPrimaryPackRefs, 5*time.Second) - - secondaryRepository := &gitalypb.Repository{StorageName: secondaryStorage, RelativePath: repositoryRelativePath} - - expectedSecondaryGcReq := expectedPrimaryGcReq - expectedSecondaryGcReq.Repository = secondaryRepository - - expectedSecondaryRepackFullReq := expectedPrimaryRepackFullReq - expectedSecondaryRepackFullReq.Repository = secondaryRepository - - expectedSecondaryRepackIncrementalReq := expectedPrimaryRepackIncrementalReq - expectedSecondaryRepackIncrementalReq.Repository = secondaryRepository - - expectedSecondaryCleanup := expectedPrimaryCleanup - expectedSecondaryCleanup.Repository = secondaryRepository - - expectedSecondaryWriteCommitGraph := expectedPrimaryWriteCommitGraph - expectedSecondaryWriteCommitGraph.Repository = secondaryRepository - - expectedSecondaryMidxRepack := expectedPrimaryMidxRepack - expectedSecondaryMidxRepack.Repository = secondaryRepository - - expectedSecondaryOptimizeRepository := expectedPrimaryOptimizeRepository - expectedSecondaryOptimizeRepository.Repository = secondaryRepository - - expectedSecondaryPruneUnreachableObjects := expectedPruneUnreachableObjects - expectedSecondaryPruneUnreachableObjects.Repository = secondaryRepository - - expectedSecondaryPackRefs := expectedPrimaryPackRefs - expectedSecondaryPackRefs.Repository = secondaryRepository - - // ensure secondary gitaly server received the expected requests - waitForRequest(t, secondaryServer.gcChan, expectedSecondaryGcReq, 5*time.Second) - waitForRequest(t, secondaryServer.repackIncrChan, expectedSecondaryRepackIncrementalReq, 5*time.Second) - waitForRequest(t, secondaryServer.repackFullChan, expectedSecondaryRepackFullReq, 5*time.Second) - waitForRequest(t, secondaryServer.cleanupChan, expectedSecondaryCleanup, 5*time.Second) - waitForRequest(t, secondaryServer.writeCommitGraphChan, expectedSecondaryWriteCommitGraph, 5*time.Second) - waitForRequest(t, secondaryServer.midxRepackChan, expectedSecondaryMidxRepack, 5*time.Second) - waitForRequest(t, secondaryServer.optimizeRepositoryChan, expectedSecondaryOptimizeRepository, 5*time.Second) - waitForRequest(t, secondaryServer.pruneUnreachableObjectsChan, expectedSecondaryPruneUnreachableObjects, 5*time.Second) - waitForRequest(t, secondaryServer.packRefsChan, expectedSecondaryPackRefs, 5*time.Second) - wg.Wait() - cancel() - <-replMgrDone -} - -type mockServer struct { - gcChan, repackFullChan, repackIncrChan, cleanupChan, writeCommitGraphChan, midxRepackChan, optimizeRepositoryChan, pruneUnreachableObjectsChan, packRefsChan chan proto.Message - - gitalypb.UnimplementedRepositoryServiceServer - gitalypb.UnimplementedRefServiceServer -} - -func newMockRepositoryServer() *mockServer { - return &mockServer{ - gcChan: make(chan proto.Message), - repackFullChan: make(chan proto.Message), - repackIncrChan: make(chan proto.Message), - cleanupChan: make(chan proto.Message), - writeCommitGraphChan: make(chan proto.Message), - midxRepackChan: make(chan proto.Message), - optimizeRepositoryChan: make(chan proto.Message), - pruneUnreachableObjectsChan: make(chan proto.Message), - packRefsChan: make(chan proto.Message), - } -} - -func (m *mockServer) GarbageCollect(ctx context.Context, in *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) { - go func() { - m.gcChan <- in - }() - return &gitalypb.GarbageCollectResponse{}, nil -} - -func (m *mockServer) RepackFull(ctx context.Context, in *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) { - go func() { - m.repackFullChan <- in - }() - return &gitalypb.RepackFullResponse{}, nil -} - -func (m *mockServer) RepackIncremental(ctx context.Context, in *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) { - go func() { - m.repackIncrChan <- in - }() - return &gitalypb.RepackIncrementalResponse{}, nil -} - -func (m *mockServer) Cleanup(ctx context.Context, in *gitalypb.CleanupRequest) (*gitalypb.CleanupResponse, error) { - go func() { - m.cleanupChan <- in - }() - return &gitalypb.CleanupResponse{}, nil -} - -func (m *mockServer) WriteCommitGraph(ctx context.Context, in *gitalypb.WriteCommitGraphRequest) (*gitalypb.WriteCommitGraphResponse, error) { - go func() { - m.writeCommitGraphChan <- in - }() - return &gitalypb.WriteCommitGraphResponse{}, nil -} - -func (m *mockServer) MidxRepack(ctx context.Context, in *gitalypb.MidxRepackRequest) (*gitalypb.MidxRepackResponse, error) { - go func() { - m.midxRepackChan <- in - }() - return &gitalypb.MidxRepackResponse{}, nil -} - -func (m *mockServer) OptimizeRepository(ctx context.Context, in *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) { - go func() { - m.optimizeRepositoryChan <- in - }() - return &gitalypb.OptimizeRepositoryResponse{}, nil -} - -func (m *mockServer) PruneUnreachableObjects(ctx context.Context, in *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) { - go func() { - m.pruneUnreachableObjectsChan <- in - }() - return &gitalypb.PruneUnreachableObjectsResponse{}, nil -} - -func (m *mockServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) (*gitalypb.PackRefsResponse, error) { - go func() { - m.packRefsChan <- in - }() - return &gitalypb.PackRefsResponse{}, nil -} - -func runMockRepositoryServer(t *testing.T, cfg gconfig.Cfg) (*mockServer, string) { - mockServer := newMockRepositoryServer() - - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { - gitalypb.RegisterRepositoryServiceServer(srv, mockServer) - gitalypb.RegisterRefServiceServer(srv, mockServer) - }, testserver.WithDisablePraefect()) - return mockServer, addr -} - -func waitForRequest(t *testing.T, ch chan proto.Message, expected proto.Message, timeout time.Duration) { - timer := time.NewTimer(timeout) - defer timer.Stop() - select { - case req := <-ch: - testhelper.ProtoEqual(t, expected, req) - close(ch) - case <-timer.C: - t.Fatal("timed out") - } -} - func TestConfirmReplication(t *testing.T) { ctx := testhelper.Context(t) |