Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-02-11 16:32:26 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-02-14 10:29:26 +0300
commit913345ca8878eaa63c68a7f2f765bb07d384c51b (patch)
treeda8bca91bf3bfef4f6cacdd6b927e293db8c2372
parent67c4cdb56adf78cdb9d54ca3de83c288e23ea3a2 (diff)
praefect: Implement replication for PruneUnreachableObjects
Mutating maintenance-style RPCs have special handling in the coordinator and replicator. Implement it for the new PruneUnreachableObjects RPC.
-rw-r--r--internal/praefect/coordinator.go6
-rw-r--r--internal/praefect/datastore/datastore.go2
-rw-r--r--internal/praefect/replicator.go21
-rw-r--r--internal/praefect/replicator_test.go37
4 files changed, 57 insertions, 9 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index faab1fac9..4270c8e82 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -227,6 +227,12 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change
return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
}
return datastore.OptimizeRepository, nil, nil
+ case "/gitaly.RepositoryService/PruneUnreachableObjects":
+ req, ok := m.(*gitalypb.PruneUnreachableObjectsRequest)
+ if !ok {
+ return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
+ }
+ return datastore.PruneUnreachableObjects, nil, nil
case "/gitaly.RefService/PackRefs":
req, ok := m.(*gitalypb.PackRefsRequest)
if !ok {
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 2766c9574..2e7497e54 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -63,6 +63,8 @@ const (
MidxRepack = ChangeType("midx_repack")
// OptimizeRepository is when replication optimizes a repository
OptimizeRepository = ChangeType("optimize_repository")
+ // PruneUnreachableObjects is when replication prunes unreachable objects in a repository
+ PruneUnreachableObjects = ChangeType("prune_unreachable_objects")
)
func (ct ChangeType) String() string {
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index a6e51c329..1bfc33745 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -46,6 +46,8 @@ type Replicator interface {
MidxRepack(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
// OptimizeRepository will optimize the target repository
OptimizeRepository(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
+ // PruneUnreachableObjects prunes unreachable objects from the target repository
+ PruneUnreachableObjects(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
}
type defaultReplicator struct {
@@ -360,6 +362,23 @@ func (dr defaultReplicator) OptimizeRepository(ctx context.Context, event datast
return nil
}
+func (dr defaultReplicator) PruneUnreachableObjects(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
+ targetRepo := &gitalypb.Repository{
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.ReplicaPath,
+ }
+
+ repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
+
+ if _, err := repoSvcClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{
+ Repository: targetRepo,
+ }); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
@@ -840,6 +859,8 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re
err = r.replicator.MidxRepack(ctx, event, targetCC)
case datastore.OptimizeRepository:
err = r.replicator.OptimizeRepository(ctx, event, targetCC)
+ case datastore.PruneUnreachableObjects:
+ err = r.replicator.PruneUnreachableObjects(ctx, event, targetCC)
default:
err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change)
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 4a409615d..3a6cad4fa 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -400,6 +400,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
_, err = repositoryClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{Repository: repository})
require.NoError(t, err)
+ _, err = repositoryClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{Repository: repository})
+ require.NoError(t, err)
+
_, err = refClient.PackRefs(ctx, &gitalypb.PackRefsRequest{
Repository: repository,
})
@@ -431,6 +434,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
expectedPrimaryOptimizeRepository := &gitalypb.OptimizeRepositoryRequest{
Repository: primaryRepository,
}
+ expectedPruneUnreachableObjects := &gitalypb.PruneUnreachableObjectsRequest{
+ Repository: primaryRepository,
+ }
expectedPrimaryPackRefs := &gitalypb.PackRefsRequest{
Repository: primaryRepository,
}
@@ -445,6 +451,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
waitForRequest(t, primaryServer.writeCommitGraphChan, expectedPrimaryWriteCommitGraph, 5*time.Second)
waitForRequest(t, primaryServer.midxRepackChan, expectedPrimaryMidxRepack, 5*time.Second)
waitForRequest(t, primaryServer.optimizeRepositoryChan, expectedPrimaryOptimizeRepository, 5*time.Second)
+ waitForRequest(t, primaryServer.pruneUnreachableObjectsChan, expectedPruneUnreachableObjects, 5*time.Second)
waitForRequest(t, primaryServer.packRefsChan, expectedPrimaryPackRefs, 5*time.Second)
secondaryRepository := &gitalypb.Repository{StorageName: secondaryStorage, RelativePath: repositoryRelativePath}
@@ -470,6 +477,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
expectedSecondaryOptimizeRepository := expectedPrimaryOptimizeRepository
expectedSecondaryOptimizeRepository.Repository = secondaryRepository
+ expectedSecondaryPruneUnreachableObjects := expectedPruneUnreachableObjects
+ expectedSecondaryPruneUnreachableObjects.Repository = secondaryRepository
+
expectedSecondaryPackRefs := expectedPrimaryPackRefs
expectedSecondaryPackRefs.Repository = secondaryRepository
@@ -481,6 +491,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
waitForRequest(t, secondaryServer.writeCommitGraphChan, expectedSecondaryWriteCommitGraph, 5*time.Second)
waitForRequest(t, secondaryServer.midxRepackChan, expectedSecondaryMidxRepack, 5*time.Second)
waitForRequest(t, secondaryServer.optimizeRepositoryChan, expectedSecondaryOptimizeRepository, 5*time.Second)
+ waitForRequest(t, secondaryServer.pruneUnreachableObjectsChan, expectedSecondaryPruneUnreachableObjects, 5*time.Second)
waitForRequest(t, secondaryServer.packRefsChan, expectedSecondaryPackRefs, 5*time.Second)
wg.Wait()
cancel()
@@ -488,7 +499,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
}
type mockServer struct {
- gcChan, repackFullChan, repackIncrChan, cleanupChan, writeCommitGraphChan, midxRepackChan, optimizeRepositoryChan, packRefsChan chan proto.Message
+ gcChan, repackFullChan, repackIncrChan, cleanupChan, writeCommitGraphChan, midxRepackChan, optimizeRepositoryChan, pruneUnreachableObjectsChan, packRefsChan chan proto.Message
gitalypb.UnimplementedRepositoryServiceServer
gitalypb.UnimplementedRefServiceServer
@@ -496,14 +507,15 @@ type mockServer struct {
func newMockRepositoryServer() *mockServer {
return &mockServer{
- gcChan: make(chan proto.Message),
- repackFullChan: make(chan proto.Message),
- repackIncrChan: make(chan proto.Message),
- cleanupChan: make(chan proto.Message),
- writeCommitGraphChan: make(chan proto.Message),
- midxRepackChan: make(chan proto.Message),
- optimizeRepositoryChan: make(chan proto.Message),
- packRefsChan: make(chan proto.Message),
+ gcChan: make(chan proto.Message),
+ repackFullChan: make(chan proto.Message),
+ repackIncrChan: make(chan proto.Message),
+ cleanupChan: make(chan proto.Message),
+ writeCommitGraphChan: make(chan proto.Message),
+ midxRepackChan: make(chan proto.Message),
+ optimizeRepositoryChan: make(chan proto.Message),
+ pruneUnreachableObjectsChan: make(chan proto.Message),
+ packRefsChan: make(chan proto.Message),
}
}
@@ -556,6 +568,13 @@ func (m *mockServer) OptimizeRepository(ctx context.Context, in *gitalypb.Optimi
return &gitalypb.OptimizeRepositoryResponse{}, nil
}
+func (m *mockServer) PruneUnreachableObjects(ctx context.Context, in *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) {
+ go func() {
+ m.pruneUnreachableObjectsChan <- in
+ }()
+ return &gitalypb.PruneUnreachableObjectsResponse{}, nil
+}
+
func (m *mockServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) (*gitalypb.PackRefsResponse, error) {
go func() {
m.packRefsChan <- in