Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-04-25 13:13:21 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-04-27 10:15:46 +0300
commit64d9727a224d703da8d1afbabfd95573af6ef212 (patch)
treee1e1a520e421a008bafdaa61cd2d275a00126af1
parent813f937448f3e5bf859a137bb487828db9085213 (diff)
replicator: Refactor tests exercising replication of maintenance RPCspks-praefect-remove-maintenance-event-creation
The replicator used to host logic to replicate maintenance-style RPCs, and thus it also had some tests which exercised this logic. Now that replication of those RPCs has been removed in favor of a best-effort routing strategy, we're essentially not testing the replicator anymore though, but the coordinator. Move the test into the coordinator's test suite. While at it, refactor it to be table-driven so that it's more readable and convert the setup to use `runPraefectServer()`.
-rw-r--r--internal/praefect/coordinator_test.go289
-rw-r--r--internal/praefect/replicator_test.go345
2 files changed, 289 insertions, 345 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 3b4d53d15..fce4d5e5b 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -19,6 +19,8 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ gconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
gitaly_metadata "gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
@@ -549,6 +551,293 @@ func TestStreamDirector_maintenance(t *testing.T) {
}
}
+func TestStreamDirector_maintenanceRPCs(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+
+ primaryStorage := "internal-gitaly-0"
+ primaryServer, primarySocketPath := runMockMaintenanceServer(
+ t, testcfg.Build(t, testcfg.WithStorages(primaryStorage)),
+ )
+
+ secondaryStorage := "internal-gitaly-1"
+ secondaryServer, secondarySocketPath := runMockMaintenanceServer(t, testcfg.Build(
+ t, testcfg.WithStorages(secondaryStorage)),
+ )
+
+ cc, _, cleanup := runPraefectServer(t, ctx, config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*config.Node{
+ {
+ Storage: primaryStorage,
+ Address: primarySocketPath,
+ },
+ {
+ Storage: secondaryStorage,
+ Address: secondarySocketPath,
+ },
+ },
+ },
+ },
+ }, buildOptions{})
+ defer cleanup()
+
+ repository := &gitalypb.Repository{
+ StorageName: "default",
+ RelativePath: gittest.NewRepositoryName(t, true),
+ }
+ primaryRepository := &gitalypb.Repository{
+ StorageName: primaryStorage,
+ RelativePath: repository.RelativePath,
+ }
+ secondaryRepository := &gitalypb.Repository{
+ StorageName: secondaryStorage,
+ RelativePath: repository.RelativePath,
+ }
+
+ repositoryClient := gitalypb.NewRepositoryServiceClient(cc)
+ refClient := gitalypb.NewRefServiceClient(cc)
+
+ for _, tc := range []struct {
+ desc string
+ maintenanceFunc func(t *testing.T)
+ expectedPrimaryRequest proto.Message
+ expectedSecondaryRequest proto.Message
+ }{
+ {
+ desc: "GarbageCollect",
+ maintenanceFunc: func(t *testing.T) {
+ //nolint:staticcheck
+ _, err := repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{
+ Repository: repository,
+ CreateBitmap: true,
+ Prune: true,
+ })
+ require.NoError(t, err)
+ },
+ expectedPrimaryRequest: &gitalypb.GarbageCollectRequest{
+ Repository: primaryRepository,
+ CreateBitmap: true,
+ Prune: true,
+ },
+ expectedSecondaryRequest: &gitalypb.GarbageCollectRequest{
+ Repository: secondaryRepository,
+ CreateBitmap: true,
+ Prune: true,
+ },
+ },
+ {
+ desc: "RepackFull",
+ maintenanceFunc: func(t *testing.T) {
+ //nolint:staticcheck
+ _, err := repositoryClient.RepackFull(ctx, &gitalypb.RepackFullRequest{
+ Repository: repository,
+ CreateBitmap: true,
+ })
+ require.NoError(t, err)
+ },
+ expectedPrimaryRequest: &gitalypb.RepackFullRequest{
+ Repository: primaryRepository,
+ CreateBitmap: true,
+ },
+ expectedSecondaryRequest: &gitalypb.RepackFullRequest{
+ Repository: secondaryRepository,
+ CreateBitmap: true,
+ },
+ },
+ {
+ desc: "RepackIncremental",
+ maintenanceFunc: func(t *testing.T) {
+ //nolint:staticcheck
+ _, err := repositoryClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{
+ Repository: repository,
+ })
+ require.NoError(t, err)
+ },
+ expectedPrimaryRequest: &gitalypb.RepackIncrementalRequest{
+ Repository: primaryRepository,
+ },
+ expectedSecondaryRequest: &gitalypb.RepackIncrementalRequest{
+ Repository: secondaryRepository,
+ },
+ },
+ {
+ desc: "Cleanup",
+ maintenanceFunc: func(t *testing.T) {
+ //nolint:staticcheck
+ _, err := repositoryClient.Cleanup(ctx, &gitalypb.CleanupRequest{
+ Repository: repository,
+ })
+ require.NoError(t, err)
+ },
+ expectedPrimaryRequest: &gitalypb.CleanupRequest{
+ Repository: primaryRepository,
+ },
+ expectedSecondaryRequest: &gitalypb.CleanupRequest{
+ Repository: secondaryRepository,
+ },
+ },
+ {
+ desc: "WriteCommitGraph",
+ maintenanceFunc: func(t *testing.T) {
+ //nolint:staticcheck
+ _, err := repositoryClient.WriteCommitGraph(ctx, &gitalypb.WriteCommitGraphRequest{
+ Repository: repository,
+ // This is not a valid split strategy, but we currently only support a
+ // single default split strategy with value 0. So we just test with an
+ // invalid split strategy to check that a non-default value gets properly
+ // replicated.
+ SplitStrategy: 1,
+ })
+ require.NoError(t, err)
+ },
+ expectedPrimaryRequest: &gitalypb.WriteCommitGraphRequest{
+ Repository: primaryRepository,
+ SplitStrategy: 1,
+ },
+ expectedSecondaryRequest: &gitalypb.WriteCommitGraphRequest{
+ Repository: secondaryRepository,
+ SplitStrategy: 1,
+ },
+ },
+ {
+ desc: "MidxRepack",
+ maintenanceFunc: func(t *testing.T) {
+ //nolint:staticcheck
+ _, err := repositoryClient.MidxRepack(ctx, &gitalypb.MidxRepackRequest{
+ Repository: repository,
+ })
+ require.NoError(t, err)
+ },
+ expectedPrimaryRequest: &gitalypb.MidxRepackRequest{
+ Repository: primaryRepository,
+ },
+ expectedSecondaryRequest: &gitalypb.MidxRepackRequest{
+ Repository: secondaryRepository,
+ },
+ },
+ {
+ desc: "OptimizeRepository",
+ maintenanceFunc: func(t *testing.T) {
+ _, err := repositoryClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{
+ Repository: repository,
+ })
+ require.NoError(t, err)
+ },
+ expectedPrimaryRequest: &gitalypb.OptimizeRepositoryRequest{
+ Repository: primaryRepository,
+ },
+ expectedSecondaryRequest: &gitalypb.OptimizeRepositoryRequest{
+ Repository: secondaryRepository,
+ },
+ },
+ {
+ desc: "PruneUnreachableObjects",
+ maintenanceFunc: func(t *testing.T) {
+ _, err := repositoryClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{
+ Repository: repository,
+ })
+ require.NoError(t, err)
+ },
+ expectedPrimaryRequest: &gitalypb.PruneUnreachableObjectsRequest{
+ Repository: primaryRepository,
+ },
+ expectedSecondaryRequest: &gitalypb.PruneUnreachableObjectsRequest{
+ Repository: secondaryRepository,
+ },
+ },
+ {
+ desc: "PackRefs",
+ maintenanceFunc: func(t *testing.T) {
+ //nolint:staticcheck
+ _, err := refClient.PackRefs(ctx, &gitalypb.PackRefsRequest{
+ Repository: repository,
+ })
+ require.NoError(t, err)
+ },
+ expectedPrimaryRequest: &gitalypb.PackRefsRequest{
+ Repository: primaryRepository,
+ },
+ expectedSecondaryRequest: &gitalypb.PackRefsRequest{
+ Repository: secondaryRepository,
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ tc.maintenanceFunc(t)
+ testhelper.ProtoEqual(t, tc.expectedPrimaryRequest, <-primaryServer.requestCh)
+ testhelper.ProtoEqual(t, tc.expectedSecondaryRequest, <-secondaryServer.requestCh)
+ })
+ }
+}
+
+type mockMaintenanceServer struct {
+ requestCh chan proto.Message
+ gitalypb.UnimplementedRepositoryServiceServer
+ gitalypb.UnimplementedRefServiceServer
+}
+
+func runMockMaintenanceServer(t *testing.T, cfg gconfig.Cfg) (*mockMaintenanceServer, string) {
+ server := &mockMaintenanceServer{
+ requestCh: make(chan proto.Message, 1),
+ }
+
+ addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterRepositoryServiceServer(srv, server)
+ gitalypb.RegisterRefServiceServer(srv, server)
+ }, testserver.WithDisablePraefect())
+
+ return server, addr
+}
+
+func (m *mockMaintenanceServer) GarbageCollect(ctx context.Context, in *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) {
+ m.requestCh <- in
+ return &gitalypb.GarbageCollectResponse{}, nil
+}
+
+func (m *mockMaintenanceServer) RepackFull(ctx context.Context, in *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) {
+ m.requestCh <- in
+ return &gitalypb.RepackFullResponse{}, nil
+}
+
+func (m *mockMaintenanceServer) RepackIncremental(ctx context.Context, in *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) {
+ m.requestCh <- in
+ return &gitalypb.RepackIncrementalResponse{}, nil
+}
+
+func (m *mockMaintenanceServer) Cleanup(ctx context.Context, in *gitalypb.CleanupRequest) (*gitalypb.CleanupResponse, error) {
+ m.requestCh <- in
+ return &gitalypb.CleanupResponse{}, nil
+}
+
+func (m *mockMaintenanceServer) WriteCommitGraph(ctx context.Context, in *gitalypb.WriteCommitGraphRequest) (*gitalypb.WriteCommitGraphResponse, error) {
+ m.requestCh <- in
+ return &gitalypb.WriteCommitGraphResponse{}, nil
+}
+
+func (m *mockMaintenanceServer) MidxRepack(ctx context.Context, in *gitalypb.MidxRepackRequest) (*gitalypb.MidxRepackResponse, error) {
+ m.requestCh <- in
+ return &gitalypb.MidxRepackResponse{}, nil
+}
+
+func (m *mockMaintenanceServer) OptimizeRepository(ctx context.Context, in *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) {
+ m.requestCh <- in
+ return &gitalypb.OptimizeRepositoryResponse{}, nil
+}
+
+func (m *mockMaintenanceServer) PruneUnreachableObjects(ctx context.Context, in *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) {
+ m.requestCh <- in
+ return &gitalypb.PruneUnreachableObjectsResponse{}, nil
+}
+
+func (m *mockMaintenanceServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) (*gitalypb.PackRefsResponse, error) {
+ m.requestCh <- in
+ return &gitalypb.PackRefsResponse{}, nil
+}
+
type mockRouter struct {
Router
routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 89e5d342c..dd91d20b8 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -2,7 +2,6 @@ package praefect
import (
"context"
- "fmt"
"path/filepath"
"strings"
"sync"
@@ -22,7 +21,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/objectpool"
gconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
- "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
@@ -32,7 +30,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
@@ -279,348 +276,6 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
}
}
-func TestReplicator_PropagateReplicationJob(t *testing.T) {
- t.Parallel()
-
- ctx := testhelper.Context(t)
- primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1"
-
- primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage))
- primaryServer, primarySocketPath := runMockRepositoryServer(t, primCfg)
-
- secCfg := testcfg.Build(t, testcfg.WithStorages(secondaryStorage))
- secondaryServer, secondarySocketPath := runMockRepositoryServer(t, secCfg)
-
- conf := config.Config{
- VirtualStorages: []*config.VirtualStorage{
- {
- Name: "default",
- Nodes: []*config.Node{
- {
- Storage: primaryStorage,
- Address: primarySocketPath,
- },
- {
- Storage: secondaryStorage,
- Address: secondarySocketPath,
- },
- },
- },
- },
- }
-
- // We need to await for the replication event to make a complete roundtrip to the remote.
- // Because send to the channel happens during in-flight request there are ongoing filesystem
- // operations related to caching. The cleanup happens before all IO cache operations finished
- // those resulting to:
- // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty
- // By using WaitGroup we are sure the test cleanup will be started after all replication
- // requests are completed, so no running cache IO operations happen.
- queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
-
- var wg sync.WaitGroup
- // When maintenance operation routing is enabled we don't expect to see any
- // replication events. The observed behaviour should still be the same though: we
- // expect to observe the RPC calls on both the primary and secondary node because we
- // route them to both at the same time.
- queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
- require.FailNow(t, "no replication jobs should have been created")
- return datastore.ReplicationEvent{}, fmt.Errorf("unexpected enqueue")
- })
-
- logEntry := testhelper.NewDiscardingLogEntry(t)
-
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
- require.NoError(t, err)
- nodeMgr.Start(0, time.Hour)
- defer nodeMgr.Stop()
-
- txMgr := transactions.NewManager(conf)
-
- repositoryRelativePath := "/path/to/repo"
-
- rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
- return repositoryRelativePath, nil, nil
- },
- GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) {
- return repositoryRelativePath, nil
- },
- }
-
- coordinator := NewCoordinator(
- queue,
- rs,
- NewNodeManagerRouter(nodeMgr, rs),
- txMgr,
- conf,
- protoregistry.GitalyProtoPreregistered,
- )
-
- replmgr := NewReplMgr(logEntry, conf.StorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr))
-
- prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, txMgr, rs, nil, nil, nil, nil)
-
- listener, port := listenAvailPort(t)
- ctx, cancel := context.WithCancel(ctx)
-
- go prf.Serve(listener)
- defer prf.Stop()
-
- cc := dialLocalPort(t, port, false)
- repositoryClient := gitalypb.NewRepositoryServiceClient(cc)
- refClient := gitalypb.NewRefServiceClient(cc)
- defer listener.Close()
- defer cc.Close()
-
- repository := &gitalypb.Repository{
- StorageName: conf.VirtualStorages[0].Name,
- RelativePath: repositoryRelativePath,
- }
-
- //nolint:staticcheck
- _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{
- Repository: repository,
- CreateBitmap: true,
- Prune: true,
- })
- require.NoError(t, err)
-
- //nolint:staticcheck
- _, err = repositoryClient.RepackFull(ctx, &gitalypb.RepackFullRequest{Repository: repository, CreateBitmap: false})
- require.NoError(t, err)
-
- //nolint:staticcheck
- _, err = repositoryClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{Repository: repository})
- require.NoError(t, err)
-
- //nolint:staticcheck
- _, err = repositoryClient.Cleanup(ctx, &gitalypb.CleanupRequest{Repository: repository})
- require.NoError(t, err)
-
- //nolint:staticcheck
- _, err = repositoryClient.WriteCommitGraph(ctx, &gitalypb.WriteCommitGraphRequest{
- Repository: repository,
- // This is not a valid split strategy, but we currently only support a
- // single default split strategy with value 0. So we just test with an
- // invalid split strategy to check that a non-default value gets properly
- // replicated.
- SplitStrategy: 1,
- })
- require.NoError(t, err)
-
- //nolint:staticcheck
- _, err = repositoryClient.MidxRepack(ctx, &gitalypb.MidxRepackRequest{Repository: repository})
- require.NoError(t, err)
-
- _, err = repositoryClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{Repository: repository})
- require.NoError(t, err)
-
- _, err = repositoryClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{Repository: repository})
- require.NoError(t, err)
-
- //nolint:staticcheck
- _, err = refClient.PackRefs(ctx, &gitalypb.PackRefsRequest{
- Repository: repository,
- })
- require.NoError(t, err)
-
- primaryRepository := &gitalypb.Repository{StorageName: primaryStorage, RelativePath: repositoryRelativePath}
- expectedPrimaryGcReq := &gitalypb.GarbageCollectRequest{
- Repository: primaryRepository,
- CreateBitmap: true,
- Prune: true,
- }
- expectedPrimaryRepackFullReq := &gitalypb.RepackFullRequest{
- Repository: primaryRepository,
- CreateBitmap: false,
- }
- expectedPrimaryRepackIncrementalReq := &gitalypb.RepackIncrementalRequest{
- Repository: primaryRepository,
- }
- expectedPrimaryCleanup := &gitalypb.CleanupRequest{
- Repository: primaryRepository,
- }
- expectedPrimaryWriteCommitGraph := &gitalypb.WriteCommitGraphRequest{
- Repository: primaryRepository,
- SplitStrategy: 1,
- }
- expectedPrimaryMidxRepack := &gitalypb.MidxRepackRequest{
- Repository: primaryRepository,
- }
- expectedPrimaryOptimizeRepository := &gitalypb.OptimizeRepositoryRequest{
- Repository: primaryRepository,
- }
- expectedPruneUnreachableObjects := &gitalypb.PruneUnreachableObjectsRequest{
- Repository: primaryRepository,
- }
- expectedPrimaryPackRefs := &gitalypb.PackRefsRequest{
- Repository: primaryRepository,
- }
-
- replMgrDone := startProcessBacklog(ctx, replmgr)
-
- // ensure primary gitaly server received the expected requests
- waitForRequest(t, primaryServer.gcChan, expectedPrimaryGcReq, 5*time.Second)
- waitForRequest(t, primaryServer.repackIncrChan, expectedPrimaryRepackIncrementalReq, 5*time.Second)
- waitForRequest(t, primaryServer.repackFullChan, expectedPrimaryRepackFullReq, 5*time.Second)
- waitForRequest(t, primaryServer.cleanupChan, expectedPrimaryCleanup, 5*time.Second)
- waitForRequest(t, primaryServer.writeCommitGraphChan, expectedPrimaryWriteCommitGraph, 5*time.Second)
- waitForRequest(t, primaryServer.midxRepackChan, expectedPrimaryMidxRepack, 5*time.Second)
- waitForRequest(t, primaryServer.optimizeRepositoryChan, expectedPrimaryOptimizeRepository, 5*time.Second)
- waitForRequest(t, primaryServer.pruneUnreachableObjectsChan, expectedPruneUnreachableObjects, 5*time.Second)
- waitForRequest(t, primaryServer.packRefsChan, expectedPrimaryPackRefs, 5*time.Second)
-
- secondaryRepository := &gitalypb.Repository{StorageName: secondaryStorage, RelativePath: repositoryRelativePath}
-
- expectedSecondaryGcReq := expectedPrimaryGcReq
- expectedSecondaryGcReq.Repository = secondaryRepository
-
- expectedSecondaryRepackFullReq := expectedPrimaryRepackFullReq
- expectedSecondaryRepackFullReq.Repository = secondaryRepository
-
- expectedSecondaryRepackIncrementalReq := expectedPrimaryRepackIncrementalReq
- expectedSecondaryRepackIncrementalReq.Repository = secondaryRepository
-
- expectedSecondaryCleanup := expectedPrimaryCleanup
- expectedSecondaryCleanup.Repository = secondaryRepository
-
- expectedSecondaryWriteCommitGraph := expectedPrimaryWriteCommitGraph
- expectedSecondaryWriteCommitGraph.Repository = secondaryRepository
-
- expectedSecondaryMidxRepack := expectedPrimaryMidxRepack
- expectedSecondaryMidxRepack.Repository = secondaryRepository
-
- expectedSecondaryOptimizeRepository := expectedPrimaryOptimizeRepository
- expectedSecondaryOptimizeRepository.Repository = secondaryRepository
-
- expectedSecondaryPruneUnreachableObjects := expectedPruneUnreachableObjects
- expectedSecondaryPruneUnreachableObjects.Repository = secondaryRepository
-
- expectedSecondaryPackRefs := expectedPrimaryPackRefs
- expectedSecondaryPackRefs.Repository = secondaryRepository
-
- // ensure secondary gitaly server received the expected requests
- waitForRequest(t, secondaryServer.gcChan, expectedSecondaryGcReq, 5*time.Second)
- waitForRequest(t, secondaryServer.repackIncrChan, expectedSecondaryRepackIncrementalReq, 5*time.Second)
- waitForRequest(t, secondaryServer.repackFullChan, expectedSecondaryRepackFullReq, 5*time.Second)
- waitForRequest(t, secondaryServer.cleanupChan, expectedSecondaryCleanup, 5*time.Second)
- waitForRequest(t, secondaryServer.writeCommitGraphChan, expectedSecondaryWriteCommitGraph, 5*time.Second)
- waitForRequest(t, secondaryServer.midxRepackChan, expectedSecondaryMidxRepack, 5*time.Second)
- waitForRequest(t, secondaryServer.optimizeRepositoryChan, expectedSecondaryOptimizeRepository, 5*time.Second)
- waitForRequest(t, secondaryServer.pruneUnreachableObjectsChan, expectedSecondaryPruneUnreachableObjects, 5*time.Second)
- waitForRequest(t, secondaryServer.packRefsChan, expectedSecondaryPackRefs, 5*time.Second)
- wg.Wait()
- cancel()
- <-replMgrDone
-}
-
-type mockServer struct {
- gcChan, repackFullChan, repackIncrChan, cleanupChan, writeCommitGraphChan, midxRepackChan, optimizeRepositoryChan, pruneUnreachableObjectsChan, packRefsChan chan proto.Message
-
- gitalypb.UnimplementedRepositoryServiceServer
- gitalypb.UnimplementedRefServiceServer
-}
-
-func newMockRepositoryServer() *mockServer {
- return &mockServer{
- gcChan: make(chan proto.Message),
- repackFullChan: make(chan proto.Message),
- repackIncrChan: make(chan proto.Message),
- cleanupChan: make(chan proto.Message),
- writeCommitGraphChan: make(chan proto.Message),
- midxRepackChan: make(chan proto.Message),
- optimizeRepositoryChan: make(chan proto.Message),
- pruneUnreachableObjectsChan: make(chan proto.Message),
- packRefsChan: make(chan proto.Message),
- }
-}
-
-func (m *mockServer) GarbageCollect(ctx context.Context, in *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) {
- go func() {
- m.gcChan <- in
- }()
- return &gitalypb.GarbageCollectResponse{}, nil
-}
-
-func (m *mockServer) RepackFull(ctx context.Context, in *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) {
- go func() {
- m.repackFullChan <- in
- }()
- return &gitalypb.RepackFullResponse{}, nil
-}
-
-func (m *mockServer) RepackIncremental(ctx context.Context, in *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) {
- go func() {
- m.repackIncrChan <- in
- }()
- return &gitalypb.RepackIncrementalResponse{}, nil
-}
-
-func (m *mockServer) Cleanup(ctx context.Context, in *gitalypb.CleanupRequest) (*gitalypb.CleanupResponse, error) {
- go func() {
- m.cleanupChan <- in
- }()
- return &gitalypb.CleanupResponse{}, nil
-}
-
-func (m *mockServer) WriteCommitGraph(ctx context.Context, in *gitalypb.WriteCommitGraphRequest) (*gitalypb.WriteCommitGraphResponse, error) {
- go func() {
- m.writeCommitGraphChan <- in
- }()
- return &gitalypb.WriteCommitGraphResponse{}, nil
-}
-
-func (m *mockServer) MidxRepack(ctx context.Context, in *gitalypb.MidxRepackRequest) (*gitalypb.MidxRepackResponse, error) {
- go func() {
- m.midxRepackChan <- in
- }()
- return &gitalypb.MidxRepackResponse{}, nil
-}
-
-func (m *mockServer) OptimizeRepository(ctx context.Context, in *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) {
- go func() {
- m.optimizeRepositoryChan <- in
- }()
- return &gitalypb.OptimizeRepositoryResponse{}, nil
-}
-
-func (m *mockServer) PruneUnreachableObjects(ctx context.Context, in *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) {
- go func() {
- m.pruneUnreachableObjectsChan <- in
- }()
- return &gitalypb.PruneUnreachableObjectsResponse{}, nil
-}
-
-func (m *mockServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) (*gitalypb.PackRefsResponse, error) {
- go func() {
- m.packRefsChan <- in
- }()
- return &gitalypb.PackRefsResponse{}, nil
-}
-
-func runMockRepositoryServer(t *testing.T, cfg gconfig.Cfg) (*mockServer, string) {
- mockServer := newMockRepositoryServer()
-
- addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
- gitalypb.RegisterRepositoryServiceServer(srv, mockServer)
- gitalypb.RegisterRefServiceServer(srv, mockServer)
- }, testserver.WithDisablePraefect())
- return mockServer, addr
-}
-
-func waitForRequest(t *testing.T, ch chan proto.Message, expected proto.Message, timeout time.Duration) {
- timer := time.NewTimer(timeout)
- defer timer.Stop()
- select {
- case req := <-ch:
- testhelper.ProtoEqual(t, expected, req)
- close(ch)
- case <-timer.C:
- t.Fatal("timed out")
- }
-}
-
func TestConfirmReplication(t *testing.T) {
ctx := testhelper.Context(t)