diff options
author | John Cai <jcai@gitlab.com> | 2020-03-21 00:56:17 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-03-21 01:29:15 +0300 |
commit | 76d7c0ae01574b4696372605a8ff721ec4963775 (patch) | |
tree | 75e41966caf75ee74956b592497757a7dfe11fc8 | |
parent | 1d8a0091862afee5776465ed9a018b542c418482 (diff) |
Do not replicate certain mutator RPCsjc-do-not-replicate-rpcs
GarbageCollect, RepackFull, RepackIncremental are mutations to the
repository that do not affect replication at all since replication uses
fetch.
-rw-r--r-- | changelogs/unreleased/jc-do-not-replicate-rpcs.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 15 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 77 |
3 files changed, 97 insertions, 0 deletions
diff --git a/changelogs/unreleased/jc-do-not-replicate-rpcs.yml b/changelogs/unreleased/jc-do-not-replicate-rpcs.yml new file mode 100644 index 000000000..8478b6089 --- /dev/null +++ b/changelogs/unreleased/jc-do-not-replicate-rpcs.yml @@ -0,0 +1,5 @@ +--- +title: Do not replicate certain mutator RPCs +merge_request: 1958 +author: +type: added diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 8c97df7a6..565ffae7d 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -34,6 +34,17 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change } } +var doNotReplicate = map[string]struct{}{ + "/gitaly.RepositoryService/GarbageCollect": struct{}{}, + "/gitaly.RepositoryService/RepackFull": struct{}{}, + "/gitaly.RepositoryService/RepackIncremental": struct{}{}, +} + +func skipReplication(fullMethodName string) bool { + _, ok := doNotReplicate[fullMethodName] + return ok +} + // Coordinator takes care of directing client requests to the appropriate // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. @@ -85,6 +96,10 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi prot var requestFinalizer func() + if skipReplication(fullMethodName) { + return proxy.NewStreamParameters(ctx, primary.GetConnection(), requestFinalizer, nil), nil + } + if mi.Operation == protoregistry.OpMutator { change, params, err := getReplicationDetails(fullMethodName, m) if err != nil { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 362eb6787..f0b1b6452 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -198,3 +198,80 @@ func TestAbsentCorrelationID(t *testing.T) { require.NotZero(t, jobs[0].CorrelationID, "the coordinator should have generated a random ID") } + +func TestStreamDirector_SkipReplication(t *testing.T) { + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "praefect", + Nodes: []*models.Node{ + &models.Node{ + Address: "tcp://gitaly-primary.example.com", + Storage: "default", + DefaultPrimary: true, + }, + &models.Node{ + Address: "tcp://gitaly-backup1.example.com", + Storage: "backup", + }}, + }, + }, + } + + logEntry := testhelper.DiscardTestEntry(t) + ds := datastore.NewInMemory(conf) + + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, promtest.NewMockHistogramVec()) + require.NoError(t, err) + + registry := protoregistry.New() + require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) + + coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, registry) + + prf := NewServer( + coordinator.StreamDirector, + logEntry, + registry, + conf, + ) + defer prf.Stop() + + listener, port := listenAvailPort(t) + go prf.Serve(listener, false) + + ctx, cancel := testhelper.Context() + defer cancel() + + repositoryClient := gitalypb.NewRepositoryServiceClient(dialLocalPort(t, port, false)) + + testRepo, _, cleanup := testhelper.NewTestRepo(t) + defer cleanup() + testRepo.StorageName = "praefect" + + // since we have no gitaly backends configured, these requests will all fail. + repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ + Repository: testRepo, + }) + repositoryClient.RepackFull(ctx, &gitalypb.RepackFullRequest{ + Repository: testRepo, + }) + repositoryClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{ + Repository: testRepo, + }) + + jobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateReady}, "backup", 10) + require.NoError(t, err) + require.Len(t, jobs, 0) + + repositoryClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ + Repository: testRepo, + }) + + jobs, err = ds.GetJobs([]datastore.JobState{datastore.JobStateReady}, "backup", 10) + require.NoError(t, err) + require.Len(t, jobs, 1) + + // since the above requests failed, the jobs should be in JobStatePending rather than + // JobStateReady. This is a bug and is tracked via https://gitlab.com/gitlab-org/gitaly/-/issues/2566 +} |