diff options
author | Toon Claes <toon@gitlab.com> | 2022-11-24 23:27:38 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2022-11-24 23:27:38 +0300 |
commit | c04d7ca0ce226fea3c83b53398e9671958bbe2be (patch) | |
tree | 0e6eb67526a6e2747c4c20074c4e6c998beea02e | |
parent | 1ddd1adc79237051b0325002a4f3d8864add8180 (diff) | |
parent | 2875608d6c90d6c9f0bee86414a645b067478b5b (diff) |
Merge branch 'smh-backport-rr-15-4' into '15-4-stable'
Allow differing relative paths in ReplicateRepository (15.4)
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5083
Merged-by: Toon Claes <toon@gitlab.com>
Approved-by: karthik nayak <knayak@gitlab.com>
Approved-by: Toon Claes <toon@gitlab.com>
Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r-- | internal/gitaly/service/repository/replicate.go | 4 | ||||
-rw-r--r-- | internal/gitaly/service/repository/replicate_test.go | 64 | ||||
-rw-r--r-- | internal/gitaly/service/server/disk_stats_test.go | 28 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 7 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 7 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 97 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 30 |
8 files changed, 174 insertions, 73 deletions
diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go index 67a52eca3..8efdaecab 100644 --- a/internal/gitaly/service/repository/replicate.go +++ b/internal/gitaly/service/repository/replicate.go @@ -98,10 +98,6 @@ func validateReplicateRepository(in *gitalypb.ReplicateRepositoryRequest) error return errors.New("source repository cannot be empty") } - if in.GetRepository().GetRelativePath() != in.GetSource().GetRelativePath() { - return errors.New("both source and repository should have the same relative path") - } - if in.GetRepository().GetStorageName() == in.GetSource().GetStorageName() { return errors.New("repository and source have the same storage") } diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index 89ca307b8..2841cf133 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -47,7 +47,7 @@ func TestReplicateRepository(t *testing.T) { testcfg.BuildGitalyHooks(t, cfg) testcfg.BuildGitalySSH(t, cfg) - client, serverSocketPath := runRepositoryService(t, cfg, nil, testserver.WithDisablePraefect()) + client, serverSocketPath := runRepositoryService(t, cfg, nil) cfg.SocketPath = serverSocketPath repo, repoPath := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{ @@ -81,9 +81,10 @@ func TestReplicateRepository(t *testing.T) { Repository: targetRepo, Source: repo, }) + require.NoError(t, err) - targetRepoPath := filepath.Join(cfg.Storages[1].Path, targetRepo.GetRelativePath()) + targetRepoPath := filepath.Join(cfg.Storages[1].Path, gittest.GetReplicaPath(ctx, t, cfg, targetRepo)) gittest.Exec(t, cfg, "-C", targetRepoPath, "fsck") replicatedAttrFilePath := filepath.Join(targetRepoPath, "info", "attributes") @@ -120,7 +121,7 @@ func TestReplicateRepository_hiddenRefs(t *testing.T) { testcfg.BuildGitalyHooks(t, cfg) testcfg.BuildGitalySSH(t, cfg) - client, serverSocketPath := runRepositoryService(t, cfg, nil, testserver.WithDisablePraefect()) + client, serverSocketPath := runRepositoryService(t, cfg, nil) cfg.SocketPath = serverSocketPath ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) @@ -139,7 +140,6 @@ func TestReplicateRepository_hiddenRefs(t *testing.T) { targetRepo := proto.Clone(sourceRepo).(*gitalypb.Repository) targetRepo.StorageName = cfg.Storages[1].Name - targetRepoPath := filepath.Join(cfg.Storages[1].Path, targetRepo.GetRelativePath()) _, err := client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ Repository: targetRepo, @@ -147,6 +147,7 @@ func TestReplicateRepository_hiddenRefs(t *testing.T) { }) require.NoError(t, err) + targetRepoPath := filepath.Join(cfg.Storages[1].Path, gittest.GetReplicaPath(ctx, t, cfg, targetRepo)) require.ElementsMatch(t, expectedRefs, strings.Split(text.ChompBytes(gittest.Exec(t, cfg, "-C", targetRepoPath, "for-each-ref")), "\n")) // Perform another sanity-check to verify that source and target repository have the @@ -183,6 +184,7 @@ func TestReplicateRepository_hiddenRefs(t *testing.T) { Repository: targetRepo, Source: sourceRepo, }) + require.NoError(t, err) // Verify that the references for both repositories match. @@ -243,6 +245,7 @@ func TestReplicateRepositoryTransactional(t *testing.T) { Repository: targetRepo, Source: sourceRepo, }) + require.NoError(t, err) // There is no gitattributes file, so we vote on the empty contents of that file. @@ -337,20 +340,6 @@ func TestReplicateRepositoryInvalidArguments(t *testing.T) { expectedError: "repository cannot be empty", }, { - description: "source and repository have different relative paths", - input: &gitalypb.ReplicateRepositoryRequest{ - Repository: &gitalypb.Repository{ - StorageName: "praefect-internal-0", - RelativePath: "/ab/cd/abcdef1234", - }, - Source: &gitalypb.Repository{ - StorageName: "praefect-internal-1", - RelativePath: "/ab/cd/abcdef4321", - }, - }, - expectedError: "both source and repository should have the same relative path", - }, - { description: "source and repository have the same storage", input: &gitalypb.ReplicateRepositoryRequest{ Repository: &gitalypb.Repository{ @@ -395,6 +384,15 @@ func TestReplicateRepository_BadRepository(t *testing.T) { desc: "source invalid", invalidSource: true, error: func(tb testing.TB, actual error) { + if testhelper.IsPraefectEnabled() { + // ReplicateRepository uses RepositoryExists to check whether the source repository exists on the target + // Gitaly. Gitaly returns NotFound if accessing a corrupt repository. Praefect relies on the metadata + // and returns that the repository still exists, causing this test to hit a different code path and diverge + // in behavior. + require.ErrorContains(t, actual, "synchronizing gitattributes: GetRepoPath: not a git repository: ") + return + } + testhelper.RequireGrpcError(tb, ErrInvalidSourceRepository, actual) }, }, @@ -414,7 +412,7 @@ func TestReplicateRepository_BadRepository(t *testing.T) { testcfg.BuildGitalyHooks(t, cfg) testcfg.BuildGitalySSH(t, cfg) - client, serverSocketPath := runRepositoryService(t, cfg, nil, testserver.WithDisablePraefect()) + client, serverSocketPath := runRepositoryService(t, cfg, nil) cfg.SocketPath = serverSocketPath sourceRepo, _ := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{ @@ -435,9 +433,10 @@ func TestReplicateRepository_BadRepository(t *testing.T) { locator := config.NewLocator(cfg) for _, invalidRepo := range invalidRepos { - invalidRepoPath, err := locator.GetPath(invalidRepo) + storagePath, err := locator.GetStorageByName(invalidRepo.StorageName) require.NoError(t, err) + invalidRepoPath := filepath.Join(storagePath, gittest.GetReplicaPath(ctx, t, cfg, invalidRepo)) // delete git data so make the repo invalid for _, path := range []string{"refs", "objects", "HEAD"} { require.NoError(t, os.RemoveAll(filepath.Join(invalidRepoPath, path))) @@ -469,34 +468,27 @@ func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) { testcfg.BuildGitalyHooks(t, cfg) testcfg.BuildGitalySSH(t, cfg) - // Our test setup does not allow for Praefects with multiple storages. We thus have to - // disable Praefect here. - client, socketPath := runRepositoryService(t, cfg, nil, testserver.WithDisablePraefect()) + client, socketPath := runRepositoryService(t, cfg, nil) cfg.SocketPath = socketPath targetRepo, _ := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{ Storage: cfg.Storages[1], }) - // The source repository must be at the same path as the target repository, and it must be a - // real repository. In order to still have the fetch fail, we corrupt the repository by - // writing garbage into HEAD. - sourceRepo := &gitalypb.Repository{ - StorageName: "default", - RelativePath: targetRepo.RelativePath, - } - sourceRepoPath, err := config.NewLocator(cfg).GetPath(sourceRepo) - require.NoError(t, err) - require.NoError(t, os.MkdirAll(sourceRepoPath, 0o777)) - gittest.Exec(t, cfg, "init", "--bare", sourceRepoPath) + sourceRepo, sourceRepoPath := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{ + Storage: cfg.Storages[0], + }) + + // We corrupt the repository by writing garbage into HEAD. require.NoError(t, os.WriteFile(filepath.Join(sourceRepoPath, "HEAD"), []byte("garbage"), 0o666)) ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) - _, err = client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ + _, err := client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ Repository: targetRepo, Source: sourceRepo, }) + require.Error(t, err) require.Contains(t, err.Error(), "fetch: exit status 128") } @@ -573,7 +565,7 @@ func TestFetchInternalRemote_successful(t *testing.T) { referenceTransactionHookCalled++ return nil }), - ), testserver.WithDisablePraefect()) + )) ctx, err := storage.InjectGitalyServers(ctx, remoteRepo.GetStorageName(), remoteAddr, remoteCfg.Auth.Token) require.NoError(t, err) diff --git a/internal/gitaly/service/server/disk_stats_test.go b/internal/gitaly/service/server/disk_stats_test.go index f0e5f34f4..5dac8e170 100644 --- a/internal/gitaly/service/server/disk_stats_test.go +++ b/internal/gitaly/service/server/disk_stats_test.go @@ -26,7 +26,22 @@ func TestStorageDiskStatistics(t *testing.T) { c, err := client.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{}) require.NoError(t, err) - require.Len(t, c.GetStorageStatuses(), len(cfg.Storages)) + expectedStorages := len(cfg.Storages) + if testhelper.IsPraefectEnabled() { + // Praefect does not virtualize StorageDiskStatistics correctly. It proxies the call to each Gitaly + // and returns the results of all of their storages. However, not all storages on a Gitaly node are + // necessarily part of a virtual storage. Likewise, Praefect should not expose the individual storages + // that make up a virtual storage externally but should instead provide a single result for a virtual + // storage. + // + // In our test setup, we have two storages on a single Gitaly node. Both of the storages are the only + // storage in their own virtual storages. Praefect returns statistics for all storages on a Gitaly node + // that is part of a virtual storage, so it ends up returning both results for both physical storages + // twice. + expectedStorages = 2 * len(cfg.Storages) + } + + require.Len(t, c.GetStorageStatuses(), expectedStorages) // used and available space may change so we check if it roughly matches (+/- 1GB) avail, used := getSpaceStats(t, cfg.Storages[0].Path) @@ -37,6 +52,17 @@ func TestStorageDiskStatistics(t *testing.T) { require.Equal(t, int64(0), c.GetStorageStatuses()[1].Available) require.Equal(t, int64(0), c.GetStorageStatuses()[1].Used) require.Equal(t, cfg.Storages[1].Name, c.GetStorageStatuses()[1].StorageName) + + if testhelper.IsPraefectEnabled() { + // This is incorrect behavior caused by the bug explained above. + approxEqual(t, c.GetStorageStatuses()[2].Available, avail) + approxEqual(t, c.GetStorageStatuses()[2].Used, used) + require.Equal(t, cfg.Storages[0].Name, c.GetStorageStatuses()[2].StorageName) + + require.Equal(t, int64(0), c.GetStorageStatuses()[3].Available) + require.Equal(t, int64(0), c.GetStorageStatuses()[3].Used) + require.Equal(t, cfg.Storages[1].Name, c.GetStorageStatuses()[3].StorageName) + } } func approxEqual(t *testing.T, a, b int64) { diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 5af6eccdb..66a94ca13 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -208,7 +208,6 @@ var ( errNoGitalyServers = errors.New("no primary gitaly backends configured") errNoListener = errors.New("no listen address or socket path configured") errNoVirtualStorages = errors.New("no virtual storages configured") - errStorageAddressDuplicate = errors.New("multiple storages have the same address") errVirtualStoragesNotUnique = errors.New("virtual storages must have unique names") errVirtualStorageUnnamed = errors.New("virtual storages must have a name") ) @@ -231,7 +230,6 @@ func (c *Config) Validate() error { return fmt.Errorf("replication batch size was %d but must be >=1", c.Replication.BatchSize) } - allAddresses := make(map[string]struct{}) virtualStorages := make(map[string]struct{}, len(c.VirtualStorages)) for _, virtualStorage := range c.VirtualStorages { @@ -262,11 +260,6 @@ func (c *Config) Validate() error { return fmt.Errorf("virtual storage %q: %w", virtualStorage.Name, errDuplicateStorage) } storages[node.Storage] = struct{}{} - - if _, found := allAddresses[node.Address]; found { - return fmt.Errorf("virtual storage %q: address %q : %w", virtualStorage.Name, node.Address, errStorageAddressDuplicate) - } - allAddresses[node.Address] = struct{}{} } if virtualStorage.DefaultReplicationFactor > len(virtualStorage.Nodes) { diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index b0e1879f6..cc89c919b 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -176,16 +176,6 @@ func TestConfigValidation(t *testing.T) { errMsg: `virtual storage "secondary": no primary gitaly backends configured`, }, { - desc: "Node storage has address duplicate", - changeConfig: func(cfg *Config) { - cfg.VirtualStorages = []*VirtualStorage{ - {Name: "default", Nodes: vs1Nodes}, - {Name: "secondary", Nodes: append(vs2Nodes, vs1Nodes[1])}, - } - }, - errMsg: `multiple storages have the same address`, - }, - { desc: "default replication factor too high", changeConfig: func(cfg *Config) { cfg.VirtualStorages = []*VirtualStorage{ diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 5099a2391..af3411191 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -387,6 +387,13 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall switch change { case datastore.CreateRepo: route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath) + + // ReplicateRepository RPC should also be able to replicate if repository ID already exists in Praefect. + if call.fullMethodName == "/gitaly.RepositoryService/ReplicateRepository" && + errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { + change = datastore.UpdateRepo + route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath) + } if err != nil { return nil, fmt.Errorf("route repository creation: %w", err) } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 5722e8abc..579dfafdc 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -395,6 +395,93 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { require.NoError(t, err) } +func TestStreamDirectorMutator_ReplicateRepository(t *testing.T) { + ctx := testhelper.Context(t) + + socket := testhelper.GetTemporaryGitalySocketFileName(t) + testhelper.NewServerWithHealth(t, socket) + + // Setup config with two virtual storages. + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "praefect-1", + Nodes: []*config.Node{{Address: "unix://" + socket, Storage: "praefect-internal-1"}}, + }, + { + Name: "praefect-2", + Nodes: []*config.Node{{Address: "unix://" + socket, Storage: "praefect-internal-2"}}, + }, + }, + } + + nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + defer nodeMgr.Stop() + + incrementGenerationInvoked := false + rs := datastore.MockRepositoryStore{ + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + return relativePath, map[string]struct{}{"praefect-internal-2": {}}, nil + }, + CreateRepositoryFunc: func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { + require.Fail(t, "CreateRepository should not be called") + return nil + }, + IncrementGenerationFunc: func(ctx context.Context, repositoryID int64, primary string, secondaries []string) error { + incrementGenerationInvoked = true + return nil + }, + } + + router := mockRouter{ + // Simulate scenario where target repository already exists and error is returned. + routeRepositoryCreation: func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { + return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", commonerr.ErrRepositoryAlreadyExists) + }, + // Pass through normally to handle route creation. + routeRepositoryMutator: func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { + return NewNodeManagerRouter(nodeMgr, rs).RouteRepositoryMutator(ctx, virtualStorage, relativePath, additionalRepoRelativePath) + }, + } + + coordinator := NewCoordinator( + &datastore.MockReplicationEventQueue{}, + rs, + router, + transactions.NewManager(conf), + conf, + protoregistry.GitalyProtoPreregistered, + ) + + fullMethod := "/gitaly.RepositoryService/ReplicateRepository" + + frame, err := proto.Marshal(&gitalypb.ReplicateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: "praefect-2", + RelativePath: "/path/to/hashed/storage", + }, + Source: &gitalypb.Repository{ + StorageName: "praefect-1", + RelativePath: "/path/to/hashed/storage", + }, + }) + require.NoError(t, err) + peeker := &mockPeeker{frame} + + // Validate that stream parameters can be constructed successfully for + // `ReplicateRepository` when the target repository already exists. + streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) + require.NoError(t, err) + + // Validate that `CreateRepository()` is not invoked and `IncrementGeneration()` + // is when target repository already exists. + err = streamParams.RequestFinalizer() + require.NoError(t, err) + require.True(t, incrementGenerationInvoked) +} + func TestStreamDirector_maintenance(t *testing.T) { t.Parallel() @@ -843,12 +930,22 @@ func (m *mockMaintenanceServer) PackRefs(ctx context.Context, in *gitalypb.PackR type mockRouter struct { Router routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) + routeRepositoryCreation func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) + routeRepositoryMutator func(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) } func (m mockRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) { return m.routeRepositoryAccessorFunc(ctx, virtualStorage, relativePath, forcePrimary) } +func (m mockRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { + return m.routeRepositoryCreation(ctx, virtualStorage, relativePath, additionalRepoRelativePath) +} + +func (m mockRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { + return m.routeRepositoryMutator(ctx, virtualStorage, relativePath, additionalRepoRelativePath) +} + func TestStreamDirectorAccessor(t *testing.T) { t.Parallel() gitalySocket := testhelper.GetTemporaryGitalySocketFileName(t) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index db46ca039..7035af821 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -73,6 +73,20 @@ func StartGitalyServer(tb testing.TB, cfg config.Cfg, rubyServer *rubyserver.Ser } func runPraefectProxy(tb testing.TB, gitalyCfg config.Cfg, gitalyAddr string) PraefectServer { + var virtualStorages []*praefectconfig.VirtualStorage + for _, storage := range gitalyCfg.Storages { + virtualStorages = append(virtualStorages, &praefectconfig.VirtualStorage{ + Name: storage.Name, + Nodes: []*praefectconfig.Node{ + { + Storage: storage.Name, + Address: gitalyAddr, + Token: gitalyCfg.Auth.Token, + }, + }, + }) + } + return StartPraefect(tb, praefectconfig.Config{ SocketPath: testhelper.GetTemporaryGitalySocketFileName(tb), Auth: auth.Config{ @@ -88,21 +102,7 @@ func runPraefectProxy(tb testing.TB, gitalyCfg config.Cfg, gitalyAddr string) Pr Format: "json", Level: "info", }, - VirtualStorages: []*praefectconfig.VirtualStorage{ - { - // Only single storage will be served by the Praefect instance. We - // can't include more as it is invalid to use same address for - // different storages. - Name: gitalyCfg.Storages[0].Name, - Nodes: []*praefectconfig.Node{ - { - Storage: gitalyCfg.Storages[0].Name, - Address: gitalyAddr, - Token: gitalyCfg.Auth.Token, - }, - }, - }, - }, + VirtualStorages: virtualStorages, }) } |