diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-06-30 18:04:35 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-06-30 18:04:35 +0300 |
commit | 82a7a8e90f5bf3f0cae18d158a28eb8a7a1693c6 (patch) | |
tree | b72c66d9cfdf3a897cd1507eb3682d1fa9dd5cb2 | |
parent | 35f931e7acbaaede52c39537f2d47b06a4c95ea7 (diff) | |
parent | 62ed25098036ea988d3592ede7b72ac4220029a1 (diff) |
Merge branch 'pks-tx-replicate-repository-nested-mutators' into 'master'
repository: Fix repo replication with transactions
See merge request gitlab-org/gitaly!3630
-rw-r--r-- | internal/gitaly/service/repository/replicate.go | 12 | ||||
-rw-r--r-- | internal/gitaly/service/repository/replicate_test.go | 87 | ||||
-rw-r--r-- | internal/metadata/featureflag/feature_flags.go | 5 |
3 files changed, 104 insertions, 0 deletions
diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go index 0cd9d62a2..08d1fcbcd 100644 --- a/internal/gitaly/service/repository/replicate.go +++ b/internal/gitaly/service/repository/replicate.go @@ -13,7 +13,9 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v14/internal/command" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/remote" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/safe" "gitlab.com/gitlab-org/gitaly/v14/internal/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" @@ -190,6 +192,16 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR } func (s *server) syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + if featureflag.IsEnabled(ctx, featureflag.ReplicateRepositoryDirectFetch) { + repo := s.localrepo(in.GetRepository()) + + if err := remote.FetchInternalRemote(ctx, s.cfg, s.conns, repo, in.GetSource()); err != nil { + return fmt.Errorf("fetch internal remote: %w", err) + } + + return nil + } + remoteClient, err := s.newRemoteClient(ctx) if err != nil { return fmt.Errorf("new client: %w", err) diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index d3fd33eec..a20100787 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -9,13 +9,17 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -94,6 +98,89 @@ func TestReplicateRepository(t *testing.T) { gittest.Exec(t, cfg, "-C", targetRepoPath, "cat-file", "-p", blobID) } +func TestReplicateRepository_transactional(t *testing.T) { + testhelper.NewFeatureSets([]featureflag.FeatureFlag{ + featureflag.ReplicateRepositoryDirectFetch, + }).Run(t, testReplicateRepositoryTransactional) +} + +func testReplicateRepositoryTransactional(t *testing.T, ctx context.Context) { + cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica")) + cfg := cfgBuilder.Build(t) + + testhelper.ConfigureGitalyHooksBin(t, cfg) + testhelper.ConfigureGitalySSHBin(t, cfg) + + serverSocketPath := runRepositoryServerWithConfig(t, cfg, nil, testserver.WithDisablePraefect()) + cfg.SocketPath = serverSocketPath + + sourceRepo, sourceRepoPath, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], "source") + t.Cleanup(cleanup) + + targetRepo := proto.Clone(sourceRepo).(*gitalypb.Repository) + targetRepo.StorageName = cfg.Storages[1].Name + + votes := 0 + txServer := testTransactionServer{ + vote: func(request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + votes++ + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_COMMIT, + }, nil + }, + } + + ctx, err := txinfo.InjectTransaction(ctx, 1, "primary", true) + require.NoError(t, err) + ctx = helper.IncomingToOutgoing(ctx) + ctx = testhelper.MergeOutgoingMetadata(ctx, testhelper.GitalyServersMetadataFromCfg(t, cfg)) + + client := newMuxedRepositoryClient(t, ctx, cfg, serverSocketPath, backchannel.NewClientHandshaker( + testhelper.DiscardTestEntry(t), + func() backchannel.Server { + srv := grpc.NewServer() + gitalypb.RegisterRefTransactionServer(srv, &txServer) + return srv + }, + )) + + // The first invocation creates the repository via a snapshot given that it doesn't yet + // exist. + _, err = client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ + Repository: targetRepo, + Source: sourceRepo, + }) + + require.NoError(t, err) + require.Equal(t, 1, votes) + + // We're now changing a reference in the source repository such that we can observe changes + // in the target repo. + gittest.Exec(t, cfg, "-C", sourceRepoPath, "update-ref", "refs/heads/master", "refs/heads/master~") + + votes = 0 + + // And the second invocation uses FetchInternalRemote. + _, err = client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ + Repository: targetRepo, + Source: sourceRepo, + }) + + if featureflag.IsEnabled(ctx, featureflag.ReplicateRepositoryDirectFetch) { + require.NoError(t, err) + require.Equal(t, 2, votes) + } else { + // This is failing because we do a nested mutating RPC in `ReplicateRepository()` to + // `FetchInternalRemote()`. Because we simply pass along the incoming context as an + // outgoing one, the server would try to vote on the backchannel. But given that the + // connection is not to Praefect but to Gitaly now, it's trying to cast votes on a + // non-multiplexed Gitaly connection instead of against the expected Praefect peer. + require.Error(t, err) + require.Contains(t, err.Error(), "ref updates aborted by hook") + require.Equal(t, 0, votes) + } +} + func TestReplicateRepositoryInvalidArguments(t *testing.T) { testCases := []struct { description string diff --git a/internal/metadata/featureflag/feature_flags.go b/internal/metadata/featureflag/feature_flags.go index 3258c058c..44dbcadac 100644 --- a/internal/metadata/featureflag/feature_flags.go +++ b/internal/metadata/featureflag/feature_flags.go @@ -20,6 +20,10 @@ var ( // order to reduce the number of transactional votes. CreateRepositoryFromBundleAtomicFetch = FeatureFlag{Name: "create_repository_from_bundle_atomic_fetch", OnByDefault: false} ResolveConflictsWithHooks = FeatureFlag{Name: "resolve_conflicts_with_hooks", OnByDefault: false} + // ReplicateRepositoryDirectFetch will cause the ReplicateRepository RPC to perform fetches + // via a direct call instead of doing an RPC call to its own server. This fixes calls of + // `ReplicateRepository()` in case it's invoked via Praefect with transactions enabled. + ReplicateRepositoryDirectFetch = FeatureFlag{Name: "replicate_repository_direct_fetch", OnByDefault: false} ) // All includes all feature flags. @@ -29,4 +33,5 @@ var All = []FeatureFlag{ LFSPointersPipeline, CreateRepositoryFromBundleAtomicFetch, ResolveConflictsWithHooks, + ReplicateRepositoryDirectFetch, } |