diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-02-11 16:32:26 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-02-14 10:29:26 +0300 |
commit | 913345ca8878eaa63c68a7f2f765bb07d384c51b (patch) | |
tree | da8bca91bf3bfef4f6cacdd6b927e293db8c2372 | |
parent | 67c4cdb56adf78cdb9d54ca3de83c288e23ea3a2 (diff) |
praefect: Implement replication for PruneUnreachableObjects
Mutating maintenance-style RPCs have special handling in the coordinator
and replicator. Implement it for the new PruneUnreachableObjects RPC.
-rw-r--r-- | internal/praefect/coordinator.go | 6 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 2 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 21 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 37 |
4 files changed, 57 insertions, 9 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index faab1fac9..4270c8e82 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -227,6 +227,12 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) } return datastore.OptimizeRepository, nil, nil + case "/gitaly.RepositoryService/PruneUnreachableObjects": + req, ok := m.(*gitalypb.PruneUnreachableObjectsRequest) + if !ok { + return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) + } + return datastore.PruneUnreachableObjects, nil, nil case "/gitaly.RefService/PackRefs": req, ok := m.(*gitalypb.PackRefsRequest) if !ok { diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 2766c9574..2e7497e54 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -63,6 +63,8 @@ const ( MidxRepack = ChangeType("midx_repack") // OptimizeRepository is when replication optimizes a repository OptimizeRepository = ChangeType("optimize_repository") + // PruneUnreachableObjects is when replication prunes unreachable objects in a repository + PruneUnreachableObjects = ChangeType("prune_unreachable_objects") ) func (ct ChangeType) String() string { diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index a6e51c329..1bfc33745 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -46,6 +46,8 @@ type Replicator interface { MidxRepack(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error // OptimizeRepository will optimize the target repository OptimizeRepository(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error + // PruneUnreachableObjects prunes unreachable objects from the target repository + PruneUnreachableObjects(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error } type defaultReplicator struct { @@ -360,6 +362,23 @@ func (dr defaultReplicator) OptimizeRepository(ctx context.Context, event datast return nil } +func (dr defaultReplicator) PruneUnreachableObjects(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { + targetRepo := &gitalypb.Repository{ + StorageName: event.Job.TargetNodeStorage, + RelativePath: event.Job.ReplicaPath, + } + + repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) + + if _, err := repoSvcClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{ + Repository: targetRepo, + }); err != nil { + return err + } + + return nil +} + func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, @@ -840,6 +859,8 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re err = r.replicator.MidxRepack(ctx, event, targetCC) case datastore.OptimizeRepository: err = r.replicator.OptimizeRepository(ctx, event, targetCC) + case datastore.PruneUnreachableObjects: + err = r.replicator.PruneUnreachableObjects(ctx, event, targetCC) default: err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change) } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 4a409615d..3a6cad4fa 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -400,6 +400,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { _, err = repositoryClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{Repository: repository}) require.NoError(t, err) + _, err = repositoryClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{Repository: repository}) + require.NoError(t, err) + _, err = refClient.PackRefs(ctx, &gitalypb.PackRefsRequest{ Repository: repository, }) @@ -431,6 +434,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { expectedPrimaryOptimizeRepository := &gitalypb.OptimizeRepositoryRequest{ Repository: primaryRepository, } + expectedPruneUnreachableObjects := &gitalypb.PruneUnreachableObjectsRequest{ + Repository: primaryRepository, + } expectedPrimaryPackRefs := &gitalypb.PackRefsRequest{ Repository: primaryRepository, } @@ -445,6 +451,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { waitForRequest(t, primaryServer.writeCommitGraphChan, expectedPrimaryWriteCommitGraph, 5*time.Second) waitForRequest(t, primaryServer.midxRepackChan, expectedPrimaryMidxRepack, 5*time.Second) waitForRequest(t, primaryServer.optimizeRepositoryChan, expectedPrimaryOptimizeRepository, 5*time.Second) + waitForRequest(t, primaryServer.pruneUnreachableObjectsChan, expectedPruneUnreachableObjects, 5*time.Second) waitForRequest(t, primaryServer.packRefsChan, expectedPrimaryPackRefs, 5*time.Second) secondaryRepository := &gitalypb.Repository{StorageName: secondaryStorage, RelativePath: repositoryRelativePath} @@ -470,6 +477,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { expectedSecondaryOptimizeRepository := expectedPrimaryOptimizeRepository expectedSecondaryOptimizeRepository.Repository = secondaryRepository + expectedSecondaryPruneUnreachableObjects := expectedPruneUnreachableObjects + expectedSecondaryPruneUnreachableObjects.Repository = secondaryRepository + expectedSecondaryPackRefs := expectedPrimaryPackRefs expectedSecondaryPackRefs.Repository = secondaryRepository @@ -481,6 +491,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { waitForRequest(t, secondaryServer.writeCommitGraphChan, expectedSecondaryWriteCommitGraph, 5*time.Second) waitForRequest(t, secondaryServer.midxRepackChan, expectedSecondaryMidxRepack, 5*time.Second) waitForRequest(t, secondaryServer.optimizeRepositoryChan, expectedSecondaryOptimizeRepository, 5*time.Second) + waitForRequest(t, secondaryServer.pruneUnreachableObjectsChan, expectedSecondaryPruneUnreachableObjects, 5*time.Second) waitForRequest(t, secondaryServer.packRefsChan, expectedSecondaryPackRefs, 5*time.Second) wg.Wait() cancel() @@ -488,7 +499,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { } type mockServer struct { - gcChan, repackFullChan, repackIncrChan, cleanupChan, writeCommitGraphChan, midxRepackChan, optimizeRepositoryChan, packRefsChan chan proto.Message + gcChan, repackFullChan, repackIncrChan, cleanupChan, writeCommitGraphChan, midxRepackChan, optimizeRepositoryChan, pruneUnreachableObjectsChan, packRefsChan chan proto.Message gitalypb.UnimplementedRepositoryServiceServer gitalypb.UnimplementedRefServiceServer @@ -496,14 +507,15 @@ type mockServer struct { func newMockRepositoryServer() *mockServer { return &mockServer{ - gcChan: make(chan proto.Message), - repackFullChan: make(chan proto.Message), - repackIncrChan: make(chan proto.Message), - cleanupChan: make(chan proto.Message), - writeCommitGraphChan: make(chan proto.Message), - midxRepackChan: make(chan proto.Message), - optimizeRepositoryChan: make(chan proto.Message), - packRefsChan: make(chan proto.Message), + gcChan: make(chan proto.Message), + repackFullChan: make(chan proto.Message), + repackIncrChan: make(chan proto.Message), + cleanupChan: make(chan proto.Message), + writeCommitGraphChan: make(chan proto.Message), + midxRepackChan: make(chan proto.Message), + optimizeRepositoryChan: make(chan proto.Message), + pruneUnreachableObjectsChan: make(chan proto.Message), + packRefsChan: make(chan proto.Message), } } @@ -556,6 +568,13 @@ func (m *mockServer) OptimizeRepository(ctx context.Context, in *gitalypb.Optimi return &gitalypb.OptimizeRepositoryResponse{}, nil } +func (m *mockServer) PruneUnreachableObjects(ctx context.Context, in *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) { + go func() { + m.pruneUnreachableObjectsChan <- in + }() + return &gitalypb.PruneUnreachableObjectsResponse{}, nil +} + func (m *mockServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) (*gitalypb.PackRefsResponse, error) { go func() { m.packRefsChan <- in |