diff options
-rw-r--r-- | internal/gitaly/service/repository/fetch_remote.go | 106 | ||||
-rw-r--r-- | internal/gitaly/service/repository/fetch_remote_test.go | 93 |
2 files changed, 23 insertions, 176 deletions
diff --git a/internal/gitaly/service/repository/fetch_remote.go b/internal/gitaly/service/repository/fetch_remote.go index 480001477..ca7de9fe6 100644 --- a/internal/gitaly/service/repository/fetch_remote.go +++ b/internal/gitaly/service/repository/fetch_remote.go @@ -4,19 +4,14 @@ import ( "bytes" "context" "fmt" - "io" "strings" "time" - "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" - "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/txinfo" - "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -31,13 +26,7 @@ func (s *server) FetchRemote(ctx context.Context, req *gitalypb.FetchRemoteReque defer cancel() } - var tagsChanged bool - var err error - if featureflag.AtomicFetchRemote.IsEnabled(ctx) { - tagsChanged, err = s.fetchRemoteAtomic(ctx, req) - } else { - tagsChanged, err = s.fetchRemote(ctx, req) - } + tagsChanged, err := s.fetchRemoteAtomic(ctx, req) if err != nil { return nil, err } @@ -228,82 +217,6 @@ func (s *server) fetchRemoteAtomic(ctx context.Context, req *gitalypb.FetchRemot return true, nil } -func (s *server) fetchRemote(ctx context.Context, req *gitalypb.FetchRemoteRequest) (bool, error) { - var stderr bytes.Buffer - opts := localrepo.FetchOpts{ - Stderr: &stderr, - Force: req.Force, - Prune: !req.NoPrune, - Tags: localrepo.FetchOptsTagsAll, - Verbose: req.GetCheckTagsChanged(), - DisableTransactions: true, - } - - if req.GetNoTags() { - opts.Tags = localrepo.FetchOptsTagsNone - } - - if err := buildCommandOpts(&opts, req); err != nil { - return false, err - } - - sshCommand, cleanup, err := git.BuildSSHInvocation(ctx, s.logger, req.GetSshKey(), req.GetKnownHosts()) - if err != nil { - return false, err - } - defer cleanup() - - opts.Env = append(opts.Env, "GIT_SSH_COMMAND="+sshCommand) - - repo := s.localrepo(req.GetRepository()) - remoteName := "inmemory" - - if err := repo.FetchRemote(ctx, remoteName, opts); err != nil { - errMsg := stderr.String() - if errMsg != "" { - return false, structerr.NewInternal("fetch remote: %q: %w", errMsg, err) - } - - return false, structerr.NewInternal("fetch remote: %w", err) - } - - // Ideally, we'd do the voting process via git-fetch(1) using the reference-transaction - // hook. But by default this would lead to one hook invocation per updated ref, which is - // infeasible performance-wise. While this could be fixed via the `--atomic` flag, that's - // not a solution either: we rely on the fact that refs get updated even if a subset of refs - // diverged, and with atomic transactions it would instead be an all-or-nothing operation. - // - // Instead, we do the second-best thing, which is to vote on the resulting references. This - // is of course racy and may conflict with other mutators, causing the vote to fail. But it - // is arguably preferable to accept races in favour always replicating. If loosing the race, - // we'd fail this RPC and schedule a replication job afterwards. - if err := transaction.RunOnContext(ctx, func(tx txinfo.Transaction) error { - hash := voting.NewVoteHash() - - if err := repo.ExecAndWait(ctx, git.Command{ - Name: "for-each-ref", - }, git.WithStdout(hash)); err != nil { - return fmt.Errorf("cannot compute references vote: %w", err) - } - - vote, err := hash.Vote() - if err != nil { - return err - } - - return s.txManager.Vote(ctx, tx, vote, voting.UnknownPhase) - }); err != nil { - return false, structerr.NewAborted("failed vote on refs: %w", err) - } - - tagsChanged := true - if req.GetCheckTagsChanged() { - tagsChanged = didTagsChange(&stderr) - } - - return tagsChanged, nil -} - func buildCommandOpts(opts *localrepo.FetchOpts, req *gitalypb.FetchRemoteRequest) error { remoteURL := req.GetRemoteParams().GetUrl() var config []git.ConfigPair @@ -338,23 +251,6 @@ func buildCommandOpts(opts *localrepo.FetchOpts, req *gitalypb.FetchRemoteReques return nil } -func didTagsChange(r io.Reader) bool { - scanner := git.NewFetchScanner(r) - for scanner.Scan() { - status := scanner.StatusLine() - - // We can't detect if tags have been deleted, but we never call fetch - // with --prune-tags at the moment, so it should never happen. - if status.IsTagAdded() || status.IsTagUpdated() { - return true - } - } - - // If the scanner fails for some reason, we don't know if tags changed, so - // assume they did for safety reasons. - return scanner.Err() != nil -} - func (s *server) validateFetchRemoteRequest(req *gitalypb.FetchRemoteRequest) error { if err := s.locator.ValidateRepository(req.GetRepository()); err != nil { return structerr.NewInvalidArgument("%w", err) diff --git a/internal/gitaly/service/repository/fetch_remote_test.go b/internal/gitaly/service/repository/fetch_remote_test.go index 76fdb7ee4..6f0c1ea17 100644 --- a/internal/gitaly/service/repository/fetch_remote_test.go +++ b/internal/gitaly/service/repository/fetch_remote_test.go @@ -2,7 +2,6 @@ package repository import ( "bytes" - "context" "fmt" "io" "net/http" @@ -12,7 +11,6 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" @@ -51,11 +49,8 @@ manually using update-ref with the fetch being just a dry-run. Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) t.Parallel() - testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemote) -} -func testFetchRemote(t *testing.T, ctx context.Context) { - t.Parallel() + ctx := testhelper.Context(t) // Some of the tests require multiple calls to the clients each run struct // encompasses the expected data for a single run @@ -263,13 +258,10 @@ func testFetchRemote(t *testing.T, ctx context.Context) { expectedRefs: map[string]git.ObjectID{ "refs/heads/branch/conflict": commitID, }, - expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, - testhelper.WithInterceptedMetadataItems( - structerr.NewInternal("preparing reference update: file directory conflict"), - structerr.MetadataItem{Key: "conflicting_reference", Value: "refs/heads/branch"}, - structerr.MetadataItem{Key: "existing_reference", Value: "refs/heads/branch/conflict"}, - ), - structerr.NewInternal(`fetch remote: "error: cannot lock ref 'refs/heads/branch': 'refs/heads/branch/conflict' exists; cannot create 'refs/heads/branch'\nerror: some local refs could not be updated; try running\n 'git remote prune inmemory' to remove any old, conflicting branches\n": exit status 1`), + expectedErr: testhelper.WithInterceptedMetadataItems( + structerr.NewInternal("preparing reference update: file directory conflict"), + structerr.MetadataItem{Key: "conflicting_reference", Value: "refs/heads/branch"}, + structerr.MetadataItem{Key: "existing_reference", Value: "refs/heads/branch/conflict"}, ), }, }, @@ -328,13 +320,10 @@ func testFetchRemote(t *testing.T, ctx context.Context) { expectedRefs: map[string]git.ObjectID{ "refs/heads/branch": commitID, }, - expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, - testhelper.WithInterceptedMetadataItems( - structerr.NewInternal("preparing reference update: file directory conflict"), - structerr.MetadataItem{Key: "conflicting_reference", Value: "refs/heads/branch/conflict"}, - structerr.MetadataItem{Key: "existing_reference", Value: "refs/heads/branch"}, - ), - structerr.NewInternal(`fetch remote: "error: cannot lock ref 'refs/heads/branch/conflict': 'refs/heads/branch' exists; cannot create 'refs/heads/branch/conflict'\nerror: some local refs could not be updated; try running\n 'git remote prune inmemory' to remove any old, conflicting branches\n": exit status 1`), + expectedErr: testhelper.WithInterceptedMetadataItems( + structerr.NewInternal("preparing reference update: file directory conflict"), + structerr.MetadataItem{Key: "conflicting_reference", Value: "refs/heads/branch/conflict"}, + structerr.MetadataItem{Key: "existing_reference", Value: "refs/heads/branch"}, ), }, }, @@ -487,15 +476,9 @@ func testFetchRemote(t *testing.T, ctx context.Context) { }, runs: []run{ { - expectedRefs: map[string]git.ObjectID{"refs/heads/master": commitID}, - expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, - &gitalypb.FetchRemoteResponse{TagsChanged: true}, - nil, - ), - expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, - nil, - error(structerr.NewInternal("fetch remote: exit status 1")), - ), + expectedRefs: map[string]git.ObjectID{"refs/heads/master": commitID}, + expectedResponse: &gitalypb.FetchRemoteResponse{TagsChanged: true}, + expectedErr: nil, }, }, } @@ -558,14 +541,8 @@ func testFetchRemote(t *testing.T, ctx context.Context) { "refs/heads/master": remoteCommitID, "refs/tags/v1": commitID, }, - expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, - &gitalypb.FetchRemoteResponse{TagsChanged: true}, - nil, - ), - expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, - nil, - error(structerr.NewInternal("fetch remote: exit status 1")), - ), + expectedResponse: &gitalypb.FetchRemoteResponse{TagsChanged: true}, + expectedErr: nil, }, }, } @@ -677,14 +654,8 @@ func testFetchRemote(t *testing.T, ctx context.Context) { "refs/heads/main": localDivergingID, "refs/heads/branch": remoteUpdatedID, }, - expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, - &gitalypb.FetchRemoteResponse{TagsChanged: true}, - nil, - ), - expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, - nil, - error(structerr.NewInternal("fetch remote: exit status 1")), - ), + expectedResponse: &gitalypb.FetchRemoteResponse{TagsChanged: true}, + expectedErr: nil, }, }, } @@ -721,14 +692,8 @@ func testFetchRemote(t *testing.T, ctx context.Context) { "refs/heads/main": localDivergingID, "refs/tags/v1.0.0": remoteTagID, }, - expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, - &gitalypb.FetchRemoteResponse{TagsChanged: true}, - nil, - ), - expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, - nil, - error(structerr.NewInternal("fetch remote: exit status 1")), - ), + expectedResponse: &gitalypb.FetchRemoteResponse{TagsChanged: true}, + expectedErr: nil, }, }, } @@ -986,12 +951,8 @@ func testFetchRemote(t *testing.T, ctx context.Context) { func TestFetchRemote_sshCommand(t *testing.T) { t.Parallel() - testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemoteSSHCommand) -} - -func testFetchRemoteSSHCommand(t *testing.T, ctx context.Context) { - t.Parallel() + ctx := testhelper.Context(t) cfg := testcfg.Build(t) outputPath := filepath.Join(testhelper.TempDir(t), "output") @@ -1070,11 +1031,8 @@ func testFetchRemoteSSHCommand(t *testing.T, ctx context.Context) { func TestFetchRemote_transaction(t *testing.T) { t.Parallel() - testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemoteTransaction) -} -func testFetchRemoteTransaction(t *testing.T, ctx context.Context) { - t.Parallel() + ctx := testhelper.Context(t) remoteCfg := testcfg.Build(t) _, remoteRepoPath := gittest.CreateRepository(t, ctx, remoteCfg, gittest.CreateRepositoryConfig{ @@ -1107,11 +1065,7 @@ func testFetchRemoteTransaction(t *testing.T, ctx context.Context) { }) require.NoError(t, err) - if featureflag.AtomicFetchRemote.IsEnabled(ctx) { - require.Equal(t, testhelper.GitalyOrPraefect(2, 4), len(txManager.Votes())) - } else { - require.Equal(t, testhelper.GitalyOrPraefect(1, 3), len(txManager.Votes())) - } + require.Equal(t, testhelper.GitalyOrPraefect(2, 4), len(txManager.Votes())) } func TestFetchRemote_pooledRepository(t *testing.T) { @@ -1119,11 +1073,8 @@ func TestFetchRemote_pooledRepository(t *testing.T) { Object pools are not yet supported with transaction management.`) t.Parallel() - testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemotePooledRepository) -} -func testFetchRemotePooledRepository(t *testing.T, ctx context.Context) { - t.Parallel() + ctx := testhelper.Context(t) // By default git-fetch(1) will always run with `core.alternateRefsCommand=exit 0 #`, which // effectively disables use of alternate refs. We can't just unset this value, so instead we |