diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2022-11-01 12:31:39 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2022-11-01 12:31:39 +0300 |
commit | b23dd3f6d212ae1f57a724551e0678d5a5b4a62d (patch) | |
tree | caae75f4e16e4f50b99c2a7287155f1e44f6e3c3 | |
parent | de97d932278e4917243ca7f27706c27128ed2ad8 (diff) | |
parent | 3199b86032298e3b52d62f4cd3dfcdc62afcc858 (diff) |
Merge branch 'jt-replicate-repo' into 'master'
Praefect: Replicate to existing repository
Closes #4288
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/4968
Merged-by: Sami Hiltunen <shiltunen@gitlab.com>
Approved-by: James Fargher <proglottis@gmail.com>
Approved-by: Sami Hiltunen <shiltunen@gitlab.com>
Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r-- | internal/praefect/coordinator.go | 7 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 97 |
2 files changed, 104 insertions, 0 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 11ac5e09c..eeefc43ea 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -385,6 +385,13 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall switch change { case datastore.CreateRepo: route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath) + + // ReplicateRepository RPC should also be able to replicate if repository ID already exists in Praefect. + if call.fullMethodName == "/gitaly.RepositoryService/ReplicateRepository" && + errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { + change = datastore.UpdateRepo + route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath) + } if err != nil { return nil, fmt.Errorf("route repository creation: %w", err) } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 8b340e5a0..ec726b1b5 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -508,6 +508,93 @@ func testStreamDirectorMutatorSecondaryErrorHandling(t *testing.T, ctx context.C ctxCancel() } +func TestStreamDirectorMutator_ReplicateRepository(t *testing.T) { + ctx := testhelper.Context(t) + + socket := testhelper.GetTemporaryGitalySocketFileName(t) + testhelper.NewServerWithHealth(t, socket) + + // Setup config with two virtual storages. + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "praefect-1", + Nodes: []*config.Node{{Address: "unix://" + socket, Storage: "praefect-internal-1"}}, + }, + { + Name: "praefect-2", + Nodes: []*config.Node{{Address: "unix://" + socket, Storage: "praefect-internal-2"}}, + }, + }, + } + + nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + defer nodeMgr.Stop() + + incrementGenerationInvoked := false + rs := datastore.MockRepositoryStore{ + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + return relativePath, map[string]struct{}{"praefect-internal-2": {}}, nil + }, + CreateRepositoryFunc: func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { + require.Fail(t, "CreateRepository should not be called") + return nil + }, + IncrementGenerationFunc: func(ctx context.Context, repositoryID int64, primary string, secondaries []string) error { + incrementGenerationInvoked = true + return nil + }, + } + + router := mockRouter{ + // Simulate scenario where target repository already exists and error is returned. + routeRepositoryCreation: func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { + return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", commonerr.ErrRepositoryAlreadyExists) + }, + // Pass through normally to handle route creation. + routeRepositoryMutator: func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { + return NewNodeManagerRouter(nodeMgr, rs).RouteRepositoryMutator(ctx, virtualStorage, relativePath, additionalRepoRelativePath) + }, + } + + coordinator := NewCoordinator( + &datastore.MockReplicationEventQueue{}, + rs, + router, + transactions.NewManager(conf), + conf, + protoregistry.GitalyProtoPreregistered, + ) + + fullMethod := "/gitaly.RepositoryService/ReplicateRepository" + + frame, err := proto.Marshal(&gitalypb.ReplicateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: "praefect-2", + RelativePath: "/path/to/hashed/storage", + }, + Source: &gitalypb.Repository{ + StorageName: "praefect-1", + RelativePath: "/path/to/hashed/storage", + }, + }) + require.NoError(t, err) + peeker := &mockPeeker{frame} + + // Validate that stream parameters can be constructed successfully for + // `ReplicateRepository` when the target repository already exists. + streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) + require.NoError(t, err) + + // Validate that `CreateRepository()` is not invoked and `IncrementGeneration()` + // is when target repository already exists. + err = streamParams.RequestFinalizer() + require.NoError(t, err) + require.True(t, incrementGenerationInvoked) +} + func TestStreamDirector_maintenance(t *testing.T) { t.Parallel() @@ -956,12 +1043,22 @@ func (m *mockMaintenanceServer) PackRefs(ctx context.Context, in *gitalypb.PackR type mockRouter struct { Router routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) + routeRepositoryCreation func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) + routeRepositoryMutator func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) } func (m mockRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) { return m.routeRepositoryAccessorFunc(ctx, virtualStorage, relativePath, forcePrimary) } +func (m mockRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { + return m.routeRepositoryCreation(ctx, virtualStorage, relativePath, additionalRepoRelativePath) +} + +func (m mockRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { + return m.routeRepositoryMutator(ctx, virtualStorage, relativePath, additionalRepoRelativePath) +} + func TestStreamDirectorAccessor(t *testing.T) { t.Parallel() gitalySocket := testhelper.GetTemporaryGitalySocketFileName(t) |