Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-11-01 12:31:39 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-11-01 12:31:39 +0300
commitb23dd3f6d212ae1f57a724551e0678d5a5b4a62d (patch)
treecaae75f4e16e4f50b99c2a7287155f1e44f6e3c3
parentde97d932278e4917243ca7f27706c27128ed2ad8 (diff)
parent3199b86032298e3b52d62f4cd3dfcdc62afcc858 (diff)
Merge branch 'jt-replicate-repo' into 'master'
Praefect: Replicate to existing repository Closes #4288 See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/4968 Merged-by: Sami Hiltunen <shiltunen@gitlab.com> Approved-by: James Fargher <proglottis@gmail.com> Approved-by: Sami Hiltunen <shiltunen@gitlab.com> Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r--internal/praefect/coordinator.go7
-rw-r--r--internal/praefect/coordinator_test.go97
2 files changed, 104 insertions, 0 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 11ac5e09c..eeefc43ea 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -385,6 +385,13 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
switch change {
case datastore.CreateRepo:
route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath)
+
+ // ReplicateRepository RPC should also be able to replicate if repository ID already exists in Praefect.
+ if call.fullMethodName == "/gitaly.RepositoryService/ReplicateRepository" &&
+ errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
+ change = datastore.UpdateRepo
+ route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath)
+ }
if err != nil {
return nil, fmt.Errorf("route repository creation: %w", err)
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 8b340e5a0..ec726b1b5 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -508,6 +508,93 @@ func testStreamDirectorMutatorSecondaryErrorHandling(t *testing.T, ctx context.C
ctxCancel()
}
+func TestStreamDirectorMutator_ReplicateRepository(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ socket := testhelper.GetTemporaryGitalySocketFileName(t)
+ testhelper.NewServerWithHealth(t, socket)
+
+ // Setup config with two virtual storages.
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect-1",
+ Nodes: []*config.Node{{Address: "unix://" + socket, Storage: "praefect-internal-1"}},
+ },
+ {
+ Name: "praefect-2",
+ Nodes: []*config.Node{{Address: "unix://" + socket, Storage: "praefect-internal-2"}},
+ },
+ },
+ }
+
+ nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+ defer nodeMgr.Stop()
+
+ incrementGenerationInvoked := false
+ rs := datastore.MockRepositoryStore{
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ return relativePath, map[string]struct{}{"praefect-internal-2": {}}, nil
+ },
+ CreateRepositoryFunc: func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
+ require.Fail(t, "CreateRepository should not be called")
+ return nil
+ },
+ IncrementGenerationFunc: func(ctx context.Context, repositoryID int64, primary string, secondaries []string) error {
+ incrementGenerationInvoked = true
+ return nil
+ },
+ }
+
+ router := mockRouter{
+ // Simulate scenario where target repository already exists and error is returned.
+ routeRepositoryCreation: func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) {
+ return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", commonerr.ErrRepositoryAlreadyExists)
+ },
+ // Pass through normally to handle route creation.
+ routeRepositoryMutator: func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) {
+ return NewNodeManagerRouter(nodeMgr, rs).RouteRepositoryMutator(ctx, virtualStorage, relativePath, additionalRepoRelativePath)
+ },
+ }
+
+ coordinator := NewCoordinator(
+ &datastore.MockReplicationEventQueue{},
+ rs,
+ router,
+ transactions.NewManager(conf),
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
+
+ fullMethod := "/gitaly.RepositoryService/ReplicateRepository"
+
+ frame, err := proto.Marshal(&gitalypb.ReplicateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "praefect-2",
+ RelativePath: "/path/to/hashed/storage",
+ },
+ Source: &gitalypb.Repository{
+ StorageName: "praefect-1",
+ RelativePath: "/path/to/hashed/storage",
+ },
+ })
+ require.NoError(t, err)
+ peeker := &mockPeeker{frame}
+
+ // Validate that stream parameters can be constructed successfully for
+ // `ReplicateRepository` when the target repository already exists.
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+
+ // Validate that `CreateRepository()` is not invoked and `IncrementGeneration()`
+ // is when target repository already exists.
+ err = streamParams.RequestFinalizer()
+ require.NoError(t, err)
+ require.True(t, incrementGenerationInvoked)
+}
+
func TestStreamDirector_maintenance(t *testing.T) {
t.Parallel()
@@ -956,12 +1043,22 @@ func (m *mockMaintenanceServer) PackRefs(ctx context.Context, in *gitalypb.PackR
type mockRouter struct {
Router
routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error)
+ routeRepositoryCreation func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
+ routeRepositoryMutator func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
}
func (m mockRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) {
return m.routeRepositoryAccessorFunc(ctx, virtualStorage, relativePath, forcePrimary)
}
+func (m mockRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) {
+ return m.routeRepositoryCreation(ctx, virtualStorage, relativePath, additionalRepoRelativePath)
+}
+
+func (m mockRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) {
+ return m.routeRepositoryMutator(ctx, virtualStorage, relativePath, additionalRepoRelativePath)
+}
+
func TestStreamDirectorAccessor(t *testing.T) {
t.Parallel()
gitalySocket := testhelper.GetTemporaryGitalySocketFileName(t)