Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-06-30 18:04:35 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-06-30 18:04:35 +0300
commit82a7a8e90f5bf3f0cae18d158a28eb8a7a1693c6 (patch)
treeb72c66d9cfdf3a897cd1507eb3682d1fa9dd5cb2
parent35f931e7acbaaede52c39537f2d47b06a4c95ea7 (diff)
parent62ed25098036ea988d3592ede7b72ac4220029a1 (diff)
Merge branch 'pks-tx-replicate-repository-nested-mutators' into 'master'
repository: Fix repo replication with transactions See merge request gitlab-org/gitaly!3630
-rw-r--r--internal/gitaly/service/repository/replicate.go12
-rw-r--r--internal/gitaly/service/repository/replicate_test.go87
-rw-r--r--internal/metadata/featureflag/feature_flags.go5
3 files changed, 104 insertions, 0 deletions
diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go
index 0cd9d62a2..08d1fcbcd 100644
--- a/internal/gitaly/service/repository/replicate.go
+++ b/internal/gitaly/service/repository/replicate.go
@@ -13,7 +13,9 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/remote"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/safe"
"gitlab.com/gitlab-org/gitaly/v14/internal/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/tempdir"
@@ -190,6 +192,16 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR
}
func (s *server) syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
+ if featureflag.IsEnabled(ctx, featureflag.ReplicateRepositoryDirectFetch) {
+ repo := s.localrepo(in.GetRepository())
+
+ if err := remote.FetchInternalRemote(ctx, s.cfg, s.conns, repo, in.GetSource()); err != nil {
+ return fmt.Errorf("fetch internal remote: %w", err)
+ }
+
+ return nil
+ }
+
remoteClient, err := s.newRemoteClient(ctx)
if err != nil {
return fmt.Errorf("new client: %w", err)
diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go
index d3fd33eec..a20100787 100644
--- a/internal/gitaly/service/repository/replicate_test.go
+++ b/internal/gitaly/service/repository/replicate_test.go
@@ -9,13 +9,17 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -94,6 +98,89 @@ func TestReplicateRepository(t *testing.T) {
gittest.Exec(t, cfg, "-C", targetRepoPath, "cat-file", "-p", blobID)
}
+func TestReplicateRepository_transactional(t *testing.T) {
+ testhelper.NewFeatureSets([]featureflag.FeatureFlag{
+ featureflag.ReplicateRepositoryDirectFetch,
+ }).Run(t, testReplicateRepositoryTransactional)
+}
+
+func testReplicateRepositoryTransactional(t *testing.T, ctx context.Context) {
+ cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica"))
+ cfg := cfgBuilder.Build(t)
+
+ testhelper.ConfigureGitalyHooksBin(t, cfg)
+ testhelper.ConfigureGitalySSHBin(t, cfg)
+
+ serverSocketPath := runRepositoryServerWithConfig(t, cfg, nil, testserver.WithDisablePraefect())
+ cfg.SocketPath = serverSocketPath
+
+ sourceRepo, sourceRepoPath, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], "source")
+ t.Cleanup(cleanup)
+
+ targetRepo := proto.Clone(sourceRepo).(*gitalypb.Repository)
+ targetRepo.StorageName = cfg.Storages[1].Name
+
+ votes := 0
+ txServer := testTransactionServer{
+ vote: func(request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ votes++
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_COMMIT,
+ }, nil
+ },
+ }
+
+ ctx, err := txinfo.InjectTransaction(ctx, 1, "primary", true)
+ require.NoError(t, err)
+ ctx = helper.IncomingToOutgoing(ctx)
+ ctx = testhelper.MergeOutgoingMetadata(ctx, testhelper.GitalyServersMetadataFromCfg(t, cfg))
+
+ client := newMuxedRepositoryClient(t, ctx, cfg, serverSocketPath, backchannel.NewClientHandshaker(
+ testhelper.DiscardTestEntry(t),
+ func() backchannel.Server {
+ srv := grpc.NewServer()
+ gitalypb.RegisterRefTransactionServer(srv, &txServer)
+ return srv
+ },
+ ))
+
+ // The first invocation creates the repository via a snapshot given that it doesn't yet
+ // exist.
+ _, err = client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{
+ Repository: targetRepo,
+ Source: sourceRepo,
+ })
+
+ require.NoError(t, err)
+ require.Equal(t, 1, votes)
+
+ // We're now changing a reference in the source repository such that we can observe changes
+ // in the target repo.
+ gittest.Exec(t, cfg, "-C", sourceRepoPath, "update-ref", "refs/heads/master", "refs/heads/master~")
+
+ votes = 0
+
+ // And the second invocation uses FetchInternalRemote.
+ _, err = client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{
+ Repository: targetRepo,
+ Source: sourceRepo,
+ })
+
+ if featureflag.IsEnabled(ctx, featureflag.ReplicateRepositoryDirectFetch) {
+ require.NoError(t, err)
+ require.Equal(t, 2, votes)
+ } else {
+ // This is failing because we do a nested mutating RPC in `ReplicateRepository()` to
+ // `FetchInternalRemote()`. Because we simply pass along the incoming context as an
+ // outgoing one, the server would try to vote on the backchannel. But given that the
+ // connection is not to Praefect but to Gitaly now, it's trying to cast votes on a
+ // non-multiplexed Gitaly connection instead of against the expected Praefect peer.
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "ref updates aborted by hook")
+ require.Equal(t, 0, votes)
+ }
+}
+
func TestReplicateRepositoryInvalidArguments(t *testing.T) {
testCases := []struct {
description string
diff --git a/internal/metadata/featureflag/feature_flags.go b/internal/metadata/featureflag/feature_flags.go
index 3258c058c..44dbcadac 100644
--- a/internal/metadata/featureflag/feature_flags.go
+++ b/internal/metadata/featureflag/feature_flags.go
@@ -20,6 +20,10 @@ var (
// order to reduce the number of transactional votes.
CreateRepositoryFromBundleAtomicFetch = FeatureFlag{Name: "create_repository_from_bundle_atomic_fetch", OnByDefault: false}
ResolveConflictsWithHooks = FeatureFlag{Name: "resolve_conflicts_with_hooks", OnByDefault: false}
+ // ReplicateRepositoryDirectFetch will cause the ReplicateRepository RPC to perform fetches
+ // via a direct call instead of doing an RPC call to its own server. This fixes calls of
+ // `ReplicateRepository()` in case it's invoked via Praefect with transactions enabled.
+ ReplicateRepositoryDirectFetch = FeatureFlag{Name: "replicate_repository_direct_fetch", OnByDefault: false}
)
// All includes all feature flags.
@@ -29,4 +33,5 @@ var All = []FeatureFlag{
LFSPointersPipeline,
CreateRepositoryFromBundleAtomicFetch,
ResolveConflictsWithHooks,
+ ReplicateRepositoryDirectFetch,
}