Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-03 10:45:03 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-19 08:55:57 +0300
commitfc93593cb6b417e3fda6cf1fa9e34c85ff464c95 (patch)
tree728e224f1225b18041b6526f91a12d15ea3e35dc
parente17cb5f018a693a46b71fab5ef1d2459a4df19b0 (diff)
praefect: Stop creating replication jobs on cleanup
The `RepositoryService.Cleanup` RPC performs a set of housekeeping tasks for git repositories. Most importantly, it will remove stale lockfiles, broken objects and broken references. One can thus consider this RPC to be in the same class as `RepackFull` and `RepackIncremental` in that they're doing essentially idempotent optimizations on a repository. But while these RPCs get special handling in Praefect's coordinator, `Cleanup` doesn't. While the lack of special handling for this RPC doesn't impact correctness, due to it being a mutator it will cause us to bump the repository generation whenever it gets executed. This is essentially useless as in non-corrupted repositories no user-visible change should occur. But the biggest downside of it bumping the generation number is that many mutating RPCs actually trigger a `Cleanup()` immediately before or after the RPC. As a result, even if transactions are used, we are forced to generate a replication job because of the `Cleanup()`, rendering transactions a lot less useful. Fix this issue by implementing special routing for those cleanup RPCs similar to how repacks and garbage collection is handled, allowing us to get rid of the generation bumping. As a result, we do not schedule replication jobs anymore. Note that originally, there had been two RPCs which got executed after another mutator: `Cleanup()` and `ApplyGitattributes()`, which in combination caused us to bump the generation twice. Turns out we were implicitly relying on this generation bump for pushes though: when a push was served, read requests inspecting the push must get routed to the primary node. Otherwise e.g. the `/internal/allowed` checks are broken because secondaries have a different quarantine directory for received objects than the primary. We never hit this issue because of the generation bump, which would've caused us to always route to the primary. But with both RPCs being fixed via this commit and 77c0166c3 (repository: Use transactions when writing gitattributes, 2021-01-28), all replicas are now considered up-to-date during the whole push, which in combination with reads distribution causes failing access checks. This bug has been fixed by implementing force-routing to primaries in 1102b0b67 (praefect: Implement force-routing to primary for node-manager router, 2021-02-03) and 4d877d7d5 (praefect: Implement force-routing to primary for per-repo router, 2021-02-03). But given that it also requires the GitLab Rail supstream change edab619aa1f (gitaly: Fix access checks with transactions and quarantine environments, 2021-02-05) which has only been merged with v13.9, we have to wait until at least v13.10 to land this fix.
-rw-r--r--changelogs/unreleased/pks-praefect-cleanup-replication.yml5
-rw-r--r--internal/praefect/coordinator.go6
-rw-r--r--internal/praefect/datastore/datastore.go2
-rw-r--r--internal/praefect/replicator.go19
-rw-r--r--internal/praefect/replicator_test.go21
5 files changed, 52 insertions, 1 deletions
diff --git a/changelogs/unreleased/pks-praefect-cleanup-replication.yml b/changelogs/unreleased/pks-praefect-cleanup-replication.yml
new file mode 100644
index 000000000..4faeb66df
--- /dev/null
+++ b/changelogs/unreleased/pks-praefect-cleanup-replication.yml
@@ -0,0 +1,5 @@
+---
+title: 'praefect: Stop creating replication jobs on cleanup'
+merge_request: 3120
+author:
+type: fixed
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 22135ab74..738bf8e0b 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -191,6 +191,12 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change
return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
}
return datastore.RepackIncremental, nil, nil
+ case "/gitaly.RepositoryService/Cleanup":
+ req, ok := m.(*gitalypb.CleanupRequest)
+ if !ok {
+ return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
+ }
+ return datastore.Cleanup, nil, nil
default:
return datastore.UpdateRepo, nil, nil
}
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 082719aed..da76a8654 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -55,6 +55,8 @@ const (
RepackFull = ChangeType("repack_full")
// RepackIncremental is when replication runs an incremental repack
RepackIncremental = ChangeType("repack_incremental")
+ // Cleanup is when replication runs a repo cleanup
+ Cleanup = ChangeType("cleanup")
)
func (ct ChangeType) String() string {
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 240c59ae3..8e65d6757 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -34,6 +34,8 @@ type Replicator interface {
RepackFull(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
// RepackIncremental will do an incremental repack on the target repository
RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
+ // Cleanup will do a cleanup on the target repository
+ Cleanup(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
}
type defaultReplicator struct {
@@ -256,6 +258,21 @@ func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datasto
return err
}
+func (dr defaultReplicator) Cleanup(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
+ targetRepo := &gitalypb.Repository{
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
+ }
+
+ repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
+
+ _, err := repoSvcClient.Cleanup(ctx, &gitalypb.CleanupRequest{
+ Repository: targetRepo,
+ })
+
+ return err
+}
+
func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
@@ -624,6 +641,8 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re
err = r.replicator.RepackFull(ctx, event, targetCC)
case datastore.RepackIncremental:
err = r.replicator.RepackIncremental(ctx, event, targetCC)
+ case datastore.Cleanup:
+ err = r.replicator.Cleanup(ctx, event, targetCC)
default:
err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change)
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 8ae562e9d..9894ece76 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -392,6 +392,9 @@ func TestPropagateReplicationJob(t *testing.T) {
_, err = repositoryClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{Repository: repository})
require.NoError(t, err)
+ _, err = repositoryClient.Cleanup(ctx, &gitalypb.CleanupRequest{Repository: repository})
+ require.NoError(t, err)
+
primaryRepository := &gitalypb.Repository{StorageName: primaryStorage, RelativePath: repositoryRelativePath}
expectedPrimaryGcReq := &gitalypb.GarbageCollectRequest{
Repository: primaryRepository,
@@ -404,6 +407,9 @@ func TestPropagateReplicationJob(t *testing.T) {
expectedPrimaryRepackIncrementalReq := &gitalypb.RepackIncrementalRequest{
Repository: primaryRepository,
}
+ expectedPrimaryCleanup := &gitalypb.CleanupRequest{
+ Repository: primaryRepository,
+ }
replCtx, cancel := testhelper.Context()
defer cancel()
@@ -413,6 +419,7 @@ func TestPropagateReplicationJob(t *testing.T) {
waitForRequest(t, primaryServer.gcChan, expectedPrimaryGcReq, 5*time.Second)
waitForRequest(t, primaryServer.repackIncrChan, expectedPrimaryRepackIncrementalReq, 5*time.Second)
waitForRequest(t, primaryServer.repackFullChan, expectedPrimaryRepackFullReq, 5*time.Second)
+ waitForRequest(t, primaryServer.cleanupChan, expectedPrimaryCleanup, 5*time.Second)
secondaryRepository := &gitalypb.Repository{StorageName: secondaryStorage, RelativePath: repositoryRelativePath}
@@ -425,14 +432,18 @@ func TestPropagateReplicationJob(t *testing.T) {
expectedSecondaryRepackIncrementalReq := expectedPrimaryRepackIncrementalReq
expectedSecondaryRepackIncrementalReq.Repository = secondaryRepository
+ expectedSecondaryCleanup := expectedPrimaryCleanup
+ expectedSecondaryCleanup.Repository = secondaryRepository
+
// ensure secondary gitaly server received the expected requests
waitForRequest(t, secondaryServer.gcChan, expectedSecondaryGcReq, 5*time.Second)
waitForRequest(t, secondaryServer.repackIncrChan, expectedSecondaryRepackIncrementalReq, 5*time.Second)
waitForRequest(t, secondaryServer.repackFullChan, expectedSecondaryRepackFullReq, 5*time.Second)
+ waitForRequest(t, secondaryServer.cleanupChan, expectedSecondaryCleanup, 5*time.Second)
}
type mockRepositoryServer struct {
- gcChan, repackFullChan, repackIncrChan chan proto.Message
+ gcChan, repackFullChan, repackIncrChan, cleanupChan chan proto.Message
gitalypb.UnimplementedRepositoryServiceServer
}
@@ -442,6 +453,7 @@ func newMockRepositoryServer() *mockRepositoryServer {
gcChan: make(chan proto.Message),
repackFullChan: make(chan proto.Message),
repackIncrChan: make(chan proto.Message),
+ cleanupChan: make(chan proto.Message),
}
}
@@ -466,6 +478,13 @@ func (m *mockRepositoryServer) RepackIncremental(ctx context.Context, in *gitaly
return &gitalypb.RepackIncrementalResponse{}, nil
}
+func (m *mockRepositoryServer) Cleanup(ctx context.Context, in *gitalypb.CleanupRequest) (*gitalypb.CleanupResponse, error) {
+ go func() {
+ m.cleanupChan <- in
+ }()
+ return &gitalypb.CleanupResponse{}, nil
+}
+
func runMockRepositoryServer(t *testing.T) (*mockRepositoryServer, string, func()) {
server := testhelper.NewTestGrpcServer(t, nil, nil)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)