diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-10-20 11:23:07 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-10-20 11:23:07 +0300 |
commit | 5f813c45f52e29875d3472dd119f78e1839910c8 (patch) | |
tree | db0dcbdd4b4c090decf7a50d4b22e446cd792ae4 | |
parent | 5bed5dbafee7fe74e2e7f9e0e4c7ccde6570eedf (diff) | |
parent | d97213a565069016793801f1cd20b1edc45c6931 (diff) |
Merge branch 'jt-atomic-fetch-remote' into 'master'
repository: Update references atomically in `FetchRemote`
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6432
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Justin Tobler <jtobler@gitlab.com>
Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r-- | internal/featureflag/ff_atomic_fetch_remote.go | 9 | ||||
-rw-r--r-- | internal/git/fetch_scanner.go | 102 | ||||
-rw-r--r-- | internal/git/fetch_scanner_test.go | 184 | ||||
-rw-r--r-- | internal/git/localrepo/remote.go | 17 | ||||
-rw-r--r-- | internal/git/localrepo/remote_test.go | 47 | ||||
-rw-r--r-- | internal/gitaly/service/repository/fetch_remote.go | 291 | ||||
-rw-r--r-- | internal/gitaly/service/repository/fetch_remote_test.go | 88 |
7 files changed, 686 insertions, 52 deletions
diff --git a/internal/featureflag/ff_atomic_fetch_remote.go b/internal/featureflag/ff_atomic_fetch_remote.go new file mode 100644 index 000000000..273b0c379 --- /dev/null +++ b/internal/featureflag/ff_atomic_fetch_remote.go @@ -0,0 +1,9 @@ +package featureflag + +// AtomicFetchRemote enables atomic transactions for the `FetchRemote` RPC. +var AtomicFetchRemote = NewFeatureFlag( + "atomic_fetch_remote", + "v16.5.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/5640", + false, +) diff --git a/internal/git/fetch_scanner.go b/internal/git/fetch_scanner.go index 4a546de43..2154a1f48 100644 --- a/internal/git/fetch_scanner.go +++ b/internal/git/fetch_scanner.go @@ -3,6 +3,8 @@ package git import ( "bufio" "bytes" + "errors" + "fmt" "io" "strings" ) @@ -189,3 +191,103 @@ func parseFetchStatusLine(line []byte) (FetchStatusLine, bool) { return out, true } + +// FetchPorcelainStatusLine represents a line of status output from `git fetch` when the porcelain +// option is enabled. +type FetchPorcelainStatusLine struct { + Type RefUpdateType + OldOID ObjectID + NewOID ObjectID + Reference string +} + +// FetchPorcelainScanner scans the output of `git fetch` when the porcelain option is enabled, +// allowing information about references to be gathered. +type FetchPorcelainScanner struct { + scanner *bufio.Scanner + lastLine FetchPorcelainStatusLine + hash ObjectHash + err error +} + +// NewFetchPorcelainScanner returns a FetchPorcelainScanner. +func NewFetchPorcelainScanner(r io.Reader, hash ObjectHash) *FetchPorcelainScanner { + return &FetchPorcelainScanner{scanner: bufio.NewScanner(r), hash: hash} +} + +// Scan parses the next fetch status line from the fetch command output. +func (f *FetchPorcelainScanner) Scan() bool { + if f.scanner.Scan() { + line, err := parseFetchPorcelainStatusLine(f.scanner.Bytes(), f.hash) + if err != nil { + f.err = err + return false + } + + f.lastLine = line + return true + } + + if f.scanner.Err() != nil { + f.err = f.scanner.Err() + } + + return false +} + +// Err returns any error encountered while scanning. +func (f *FetchPorcelainScanner) Err() error { + return f.err +} + +// StatusLine returns the status line information from the last scanned line. +func (f *FetchPorcelainScanner) StatusLine() FetchPorcelainStatusLine { + return f.lastLine +} + +// parseFetchPorcelainStatusLine parses a fetch status line. +// The line format is as follows: <flag> <old-object-id> <new-object-id> <local-reference> +// Each token is delimited by a single space and each line ends with a new-line. Further format +// documentation can be found at https://git-scm.com/docs/git-fetch#_output. +func parseFetchPorcelainStatusLine(line []byte, hash ObjectHash) (FetchPorcelainStatusLine, error) { + var status FetchPorcelainStatusLine + + // Unfortunately, the reference update type must be checked and parsed first. This is because + // one of the possible values of the reference update type is <space>, which is also the same + // character used as the delimiter for the remaining tokens in the status line. Therefore, the + // line length is checked to make sure there will not be an out-of-bounds error when checking + // for the remaining tokens and the update reference type is parsed/validated. + if len(line) < 3 || line[1] != ' ' { + return FetchPorcelainStatusLine{}, errors.New("invalid status line") + } + + // First character in the line represents the status of the reference. + status.Type = RefUpdateType(line[0]) + if !status.Type.Valid() { + return FetchPorcelainStatusLine{}, fmt.Errorf("invalid reference update type: %q", status.Type) + } + + // The remaining tokens in the status line can be safely split by the <space> delimiter. + refStatus := bytes.Split(line[2:], []byte(" ")) + if len(refStatus) != 3 { + return FetchPorcelainStatusLine{}, errors.New("invalid status line") + } + + var err error + status.OldOID, err = hash.FromHex(string(refStatus[0])) + if err != nil { + return FetchPorcelainStatusLine{}, fmt.Errorf("constructing old OID: %w", err) + } + + status.NewOID, err = hash.FromHex(string(refStatus[1])) + if err != nil { + return FetchPorcelainStatusLine{}, fmt.Errorf("constructing new OID: %w", err) + } + + status.Reference = string(refStatus[2]) + if err := ValidateReference(status.Reference); err != nil { + return FetchPorcelainStatusLine{}, fmt.Errorf("validating reference: %w", err) + } + + return status, nil +} diff --git a/internal/git/fetch_scanner_test.go b/internal/git/fetch_scanner_test.go index d49feda93..d343f5587 100644 --- a/internal/git/fetch_scanner_test.go +++ b/internal/git/fetch_scanner_test.go @@ -1,6 +1,7 @@ package git import ( + "errors" "strings" "testing" @@ -8,6 +9,8 @@ import ( ) func TestFetchScannerScan(t *testing.T) { + t.Parallel() + blank := FetchStatusLine{} for _, tc := range []struct { @@ -122,7 +125,10 @@ func TestFetchScannerScan(t *testing.T) { success: true, }, } { + tc := tc t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + scanner := NewFetchScanner(strings.NewReader(tc.data)) require.Equal(t, tc.success, scanner.Scan()) require.Equal(t, tc.expected, scanner.StatusLine()) @@ -131,3 +137,181 @@ func TestFetchScannerScan(t *testing.T) { }) } } + +func TestFetchPorcelainScannerScan(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + data string + hash ObjectHash + expectedStatus FetchPorcelainStatusLine + expectedError error + success bool + }{ + { + desc: "empty line", + data: "", + expectedStatus: FetchPorcelainStatusLine{}, + }, + { + desc: "blank line", + data: " ", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{}, + expectedError: errors.New("invalid status line"), + }, + { + desc: "invalid flag", + data: "? 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{}, + expectedError: errors.New("invalid reference update type: '?'"), + }, + { + desc: "valid fast-forward status", + data: " 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{ + Type: RefUpdateTypeFastForwardUpdate, + OldOID: ObjectID("0000000000000000000000000000000000000000"), + NewOID: ObjectID("0000000000000000000000000000000000000001"), + Reference: "refs/heads/main", + }, + success: true, + }, + { + desc: "valid forced status", + data: "+ 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{ + Type: RefUpdateTypeForcedUpdate, + OldOID: ObjectID("0000000000000000000000000000000000000000"), + NewOID: ObjectID("0000000000000000000000000000000000000001"), + Reference: "refs/heads/main", + }, + success: true, + }, + { + desc: "valid pruned status", + data: "- 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{ + Type: RefUpdateTypePruned, + OldOID: ObjectID("0000000000000000000000000000000000000000"), + NewOID: ObjectID("0000000000000000000000000000000000000001"), + Reference: "refs/heads/main", + }, + success: true, + }, + { + desc: "valid tag status", + data: "t 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{ + Type: RefUpdateTypeTagUpdate, + OldOID: ObjectID("0000000000000000000000000000000000000000"), + NewOID: ObjectID("0000000000000000000000000000000000000001"), + Reference: "refs/heads/main", + }, + success: true, + }, + { + desc: "valid fetched status", + data: "* 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{ + Type: RefUpdateTypeFetched, + OldOID: ObjectID("0000000000000000000000000000000000000000"), + NewOID: ObjectID("0000000000000000000000000000000000000001"), + Reference: "refs/heads/main", + }, + success: true, + }, + { + desc: "valid rejected status", + data: "! 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{ + Type: RefUpdateTypeUpdateFailed, + OldOID: ObjectID("0000000000000000000000000000000000000000"), + NewOID: ObjectID("0000000000000000000000000000000000000001"), + Reference: "refs/heads/main", + }, + success: true, + }, + { + desc: "valid up-to-date status", + data: "= 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{ + Type: RefUpdateTypeUnchanged, + OldOID: ObjectID("0000000000000000000000000000000000000000"), + NewOID: ObjectID("0000000000000000000000000000000000000001"), + Reference: "refs/heads/main", + }, + success: true, + }, + { + desc: "invalid sha1 old OID", + data: " 0 0000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{}, + expectedError: errors.New("constructing old OID: invalid object ID: \"0\", expected length 40, got 1"), + }, + { + desc: "invalid sha1 new OID", + data: " 0000000000000000000000000000000000000000 1 refs/heads/main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{}, + expectedError: errors.New("constructing new OID: invalid object ID: \"1\", expected length 40, got 1"), + }, + { + desc: "valid sha256 status", + data: " 0000000000000000000000000000000000000000000000000000000000000000 0000000000000000000000000000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA256, + expectedStatus: FetchPorcelainStatusLine{ + Type: RefUpdateTypeFastForwardUpdate, + OldOID: ObjectID("0000000000000000000000000000000000000000000000000000000000000000"), + NewOID: ObjectID("0000000000000000000000000000000000000000000000000000000000000001"), + Reference: "refs/heads/main", + }, + success: true, + }, + { + desc: "invalid sha256 old OID", + data: " 0 0000000000000000000000000000000000000000000000000000000000000001 refs/heads/main", + hash: ObjectHashSHA256, + expectedStatus: FetchPorcelainStatusLine{}, + expectedError: errors.New("constructing old OID: invalid object ID: \"0\", expected length 64, got 1"), + }, + { + desc: "invalid sha256 new OID", + data: " 0000000000000000000000000000000000000000000000000000000000000000 1 refs/heads/main", + hash: ObjectHashSHA256, + expectedStatus: FetchPorcelainStatusLine{}, + expectedError: errors.New("constructing new OID: invalid object ID: \"1\", expected length 64, got 1"), + }, + { + desc: "invalid reference", + data: " 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 main", + hash: ObjectHashSHA1, + expectedStatus: FetchPorcelainStatusLine{}, + expectedError: errors.New("validating reference: reference is not fully qualified"), + }, + } { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + scanner := NewFetchPorcelainScanner(strings.NewReader(tc.data), tc.hash) + require.Equal(t, tc.success, scanner.Scan()) + require.Equal(t, tc.expectedStatus, scanner.StatusLine()) + if tc.expectedError != nil { + require.EqualError(t, scanner.err, tc.expectedError.Error()) + } else { + require.NoError(t, tc.expectedError) + } + }) + } +} diff --git a/internal/git/localrepo/remote.go b/internal/git/localrepo/remote.go index d6b6660c9..696512afb 100644 --- a/internal/git/localrepo/remote.go +++ b/internal/git/localrepo/remote.go @@ -50,6 +50,14 @@ type FetchOpts struct { // https://git-scm.com/docs/git-fetch#Documentation/git-fetch.txt---tags // https://git-scm.com/docs/git-fetch#Documentation/git-fetch.txt---no-tags Tags FetchOptsTags + // DryRun, if enabled, performs the `git-fetch(1)` command without updating any references. + DryRun bool + // Porcelain controls `git-fetch(1)` command output and when enabled prints output in an + // easy-to-parse format. By default, `git-fetch(1)` output is suppressed by the `--quiet` flag. + // Therefore, the Verbose option must also be enabled to receive output. + Porcelain bool + // Stdout if set it would be used to redirect stdout stream into it. + Stdout io.Writer // Stderr if set it would be used to redirect stderr stream into it. Stderr io.Writer // DisableTransactions will disable the reference-transaction hook and atomic transactions. @@ -80,6 +88,7 @@ func (repo *Repo) FetchRemote(ctx context.Context, remoteName string, opts Fetch commandOptions := []git.CmdOpt{ git.WithEnv(opts.Env...), + git.WithStdout(opts.Stdout), git.WithStderr(opts.Stderr), git.WithConfig(git.ConfigPair{ // Git is so kind to point out that we asked it to not show forced updates @@ -198,6 +207,14 @@ func (opts FetchOpts) buildFlags() []git.Option { flags = append(flags, git.Flag{Name: "--atomic"}) } + if opts.DryRun { + flags = append(flags, git.Flag{Name: "--dry-run"}) + } + + if opts.Porcelain { + flags = append(flags, git.Flag{Name: "--porcelain"}) + } + // Even if we ask Git to not print any output and to force-update branches it will still // compute whether branches have been force-updated only to discard that information again. // Let's ask it not to given that this check can be quite expensive. diff --git a/internal/git/localrepo/remote_test.go b/internal/git/localrepo/remote_test.go index c27c56fef..138428253 100644 --- a/internal/git/localrepo/remote_test.go +++ b/internal/git/localrepo/remote_test.go @@ -172,6 +172,53 @@ func TestRepo_FetchRemote(t *testing.T) { require.False(t, contains, "remote tracking branch should be pruned as it no longer exists on the remote") }) + t.Run("with dry-run", func(t *testing.T) { + repo, _ := initBareWithRemote(t, "origin") + + var stderr bytes.Buffer + require.NoError(t, repo.FetchRemote(ctx, "origin", FetchOpts{Stderr: &stderr, DryRun: true})) + + require.Empty(t, stderr.String(), "it should not produce output as it is called with --quiet flag by default") + + // Prior to the fetch, the repository did not contain any references. Consequently, we do + // not expect the repository to contain any references because the fetch performed was a + // dry-run. + refs, err := repo.GetReferences(ctx) + require.NoError(t, err) + require.Len(t, refs, 0) + }) + + t.Run("with porcelain", func(t *testing.T) { + repo, _ := initBareWithRemote(t, "origin") + + // The porcelain fetch option write output to stdout in an easy-to-parse format. By default, + // output is suppressed by the --quiet flag. The Verbose option must also be enabled to + // receive output. + var stdout bytes.Buffer + require.NoError(t, repo.FetchRemote(ctx, "origin", FetchOpts{ + Stdout: &stdout, + Porcelain: true, + Verbose: true, + })) + + hash, err := repo.ObjectHash(ctx) + require.NoError(t, err) + scanner := git.NewFetchPorcelainScanner(&stdout, hash) + + // Scan the output for expected references. + require.True(t, scanner.Scan()) + require.Equal(t, git.RefUpdateTypeFetched, scanner.StatusLine().Type) + require.Equal(t, "refs/remotes/origin/main", scanner.StatusLine().Reference) + require.True(t, scanner.Scan()) + require.Equal(t, git.RefUpdateTypeFetched, scanner.StatusLine().Type) + require.Equal(t, "refs/tags/v1.0.0", scanner.StatusLine().Reference) + + // Since the remote only contains two references, there should be nothing left in the buffer + // to scan. + require.False(t, scanner.Scan()) + require.Nil(t, scanner.Err()) + }) + t.Run("with no tags", func(t *testing.T) { repo, testRepoPath := initBareWithRemote(t, "origin") diff --git a/internal/gitaly/service/repository/fetch_remote.go b/internal/gitaly/service/repository/fetch_remote.go index ae041e265..480001477 100644 --- a/internal/gitaly/service/repository/fetch_remote.go +++ b/internal/gitaly/service/repository/fetch_remote.go @@ -5,10 +5,14 @@ import ( "context" "fmt" "io" + "strings" "time" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/txinfo" @@ -21,73 +25,246 @@ func (s *server) FetchRemote(ctx context.Context, req *gitalypb.FetchRemoteReque return nil, err } - var stderr bytes.Buffer + if req.GetTimeout() > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(req.GetTimeout())*time.Second) + defer cancel() + } + + var tagsChanged bool + var err error + if featureflag.AtomicFetchRemote.IsEnabled(ctx) { + tagsChanged, err = s.fetchRemoteAtomic(ctx, req) + } else { + tagsChanged, err = s.fetchRemote(ctx, req) + } + if err != nil { + return nil, err + } + + return &gitalypb.FetchRemoteResponse{TagsChanged: tagsChanged}, nil +} + +// fetchRemoteAtomic fetches changes from the specified remote repository. To be atomic, fetched +// objects are first quarantined and only migrated before committing the reference transaction. +func (s *server) fetchRemoteAtomic(ctx context.Context, req *gitalypb.FetchRemoteRequest) (_ bool, returnedErr error) { + var stdout, stderr bytes.Buffer opts := localrepo.FetchOpts{ - Stderr: &stderr, - Force: req.Force, - Prune: !req.NoPrune, - Tags: localrepo.FetchOptsTagsAll, - Verbose: req.GetCheckTagsChanged(), + Stdout: &stdout, + Stderr: &stderr, + Force: req.Force, + Prune: !req.NoPrune, + Tags: localrepo.FetchOptsTagsAll, + Verbose: true, + // Transactions are disabled during fetch operation because no references are updated when + // the dry-run option is enabled. Instead, the reference-transaction hook is performed + // during the subsequent execution of `git-update-ref(1)`. DisableTransactions: true, + // When the `dry-run` option is used with `git-fetch(1)`, Git objects are received without + // performing reference updates. This is used to quarantine objects on the initial fetch and + // migration to occur only during reference update. + DryRun: true, + // The `porcelain` option outputs reference update information from `git-fetch(1) to stdout. + // Since references are not updated during a `git-fetch(1)` dry-run, the reference + // information is used during `git-update-ref(1)` execution to update the appropriate + // corresponding references. + Porcelain: true, } if req.GetNoTags() { opts.Tags = localrepo.FetchOptsTagsNone } - repo := s.localrepo(req.GetRepository()) - remoteName := "inmemory" - remoteURL := req.GetRemoteParams().GetUrl() - var config []git.ConfigPair + if err := buildCommandOpts(&opts, req); err != nil { + return false, err + } - for _, refspec := range s.getRefspecs(req.GetRemoteParams().GetMirrorRefmaps()) { - config = append(config, git.ConfigPair{ - Key: "remote.inmemory.fetch", Value: refspec, - }) + sshCommand, cleanup, err := git.BuildSSHInvocation(ctx, s.logger, req.GetSshKey(), req.GetKnownHosts()) + if err != nil { + return false, err } + defer cleanup() - if resolvedAddress := req.GetRemoteParams().GetResolvedAddress(); resolvedAddress != "" { - modifiedURL, resolveConfig, err := git.GetURLAndResolveConfig(remoteURL, resolvedAddress) - if err != nil { - return nil, fmt.Errorf("couldn't get curloptResolve config: %w", err) + opts.Env = append(opts.Env, "GIT_SSH_COMMAND="+sshCommand) + + // When performing fetch, objects are received before references are updated. If references fail + // to be updated, unreachable objects could be left in the repository that would need to be + // garbage collected. To be more atomic, a quarantine directory is set up where objects will be + // fetched prior to being migrated to the main repository when reference updates are committed. + quarantineDir, err := quarantine.New(ctx, req.GetRepository(), s.logger, s.locator) + if err != nil { + return false, fmt.Errorf("creating quarantine directory: %w", err) + } + + quarantineRepo := s.localrepo(quarantineDir.QuarantinedRepo()) + if err := quarantineRepo.FetchRemote(ctx, "inmemory", opts); err != nil { + // When `git-fetch(1)` fails to apply all reference updates successfully, the command + // returns `exit status 1`. Despite this error, successful reference updates should still be + // applied during the subsequent `git-update-ref(1)`. To differentiate between regular + // errors and failed reference updates, stderr is checked for an error message. If an error + // message is present, it is determined that an error occurred and the operation halts. + errMsg := stderr.String() + if errMsg != "" { + return false, structerr.NewInternal("fetch remote: %q: %w", errMsg, err) } - remoteURL = modifiedURL - config = append(config, resolveConfig...) + // Some errors during the `git-fetch(1)` operation do not print to stderr. If the error + // message is not `exit status 1`, it is determined that the error is unrelated to failed + // reference updates and the operation halts. Otherwise, it is assumed the error is from a + // failed reference update and the operation proceeds to update references. + if err.Error() != "exit status 1" { + return false, structerr.NewInternal("fetch remote: %w", err) + } } - config = append(config, git.ConfigPair{Key: "remote.inmemory.url", Value: remoteURL}) + // A repository cannot contain references with F/D (file/directory) conflicts (i.e. + // `refs/heads/foo` and `refs/heads/foo/bar`). If fetching from the remote repository + // results in an F/D conflict, the reference update fails. In some cases a conflicting + // reference may exist locally that does not exist on the remote. In this scenario, if + // outdated references are first pruned locally, the F/D conflict can be avoided. When + // `git-fetch(1)` is performed with the `--prune` and `--dry-run` flags, the pruned + // references are also included in the output without performing any actual reference + // updates. Bulk atomic reference updates performed by `git-update-ref(1)` do not support + // F/D conflicts even if the conflicted reference is being pruned. Therefore, pruned + // references must be updated first in a separate transaction. To accommodate this, two + // different instances of `updateref.Updater` are used to keep the transactions separate. + prunedUpdater, err := updateref.New(ctx, quarantineRepo) + if err != nil { + return false, fmt.Errorf("spawning pruned updater: %w", err) + } + defer func() { + if err := prunedUpdater.Close(); err != nil && returnedErr == nil { + returnedErr = fmt.Errorf("cancel pruned updater: %w", err) + } + }() - if authHeader := req.GetRemoteParams().GetHttpAuthorizationHeader(); authHeader != "" { - config = append(config, git.ConfigPair{ - Key: fmt.Sprintf("http.%s.extraHeader", req.GetRemoteParams().GetUrl()), - Value: "Authorization: " + authHeader, - }) + // All other reference updates can be queued as part of the same transaction. + refUpdater, err := updateref.New(ctx, quarantineRepo) + if err != nil { + return false, fmt.Errorf("spawning ref updater: %w", err) } + defer func() { + if err := refUpdater.Close(); err != nil && returnedErr == nil { + returnedErr = fmt.Errorf("cancel ref updater: %w", err) + } + }() - opts.CommandOptions = append(opts.CommandOptions, git.WithConfigEnv(config...)) + if err := prunedUpdater.Start(); err != nil { + return false, fmt.Errorf("start reference transaction: %w", err) + } + + if err := refUpdater.Start(); err != nil { + return false, fmt.Errorf("start reference transaction: %w", err) + } + + objectHash, err := quarantineRepo.ObjectHash(ctx) + if err != nil { + return false, fmt.Errorf("detecting object hash: %w", err) + } + + var tagsChanged bool + + // Parse stdout to identify required reference updates. Reference updates are queued to the + // respective updater based on type. + scanner := git.NewFetchPorcelainScanner(&stdout, objectHash) + for scanner.Scan() { + status := scanner.StatusLine() + + switch status.Type { + // Failed and unchanged reference updates do not need to be applied. + case git.RefUpdateTypeUpdateFailed, git.RefUpdateTypeUnchanged: + // Queue pruned references in a separate transaction to avoid F/D conflicts. + case git.RefUpdateTypePruned: + if err := prunedUpdater.Delete(git.ReferenceName(status.Reference)); err != nil { + return false, fmt.Errorf("queueing pruned ref for deletion: %w", err) + } + // Queue all other reference updates in the same transaction. + default: + if err := refUpdater.Update(git.ReferenceName(status.Reference), status.NewOID, status.OldOID); err != nil { + return false, fmt.Errorf("queueing ref to be updated: %w", err) + } + + // While scanning reference updates, check if any tags changed. + if status.Type == git.RefUpdateTypeTagUpdate || (status.Type == git.RefUpdateTypeFetched && strings.HasPrefix(status.Reference, "refs/tags")) { + tagsChanged = true + } + } + } + if scanner.Err() != nil { + return false, fmt.Errorf("scanning fetch output: %w", scanner.Err()) + } + + // Prepare pruned references in separate transaction to avoid F/D conflicts. + if err := prunedUpdater.Prepare(); err != nil { + return false, fmt.Errorf("preparing reference prune: %w", err) + } + + // Commit pruned references to complete transaction and apply changes. + if err := prunedUpdater.Commit(); err != nil { + return false, fmt.Errorf("committing reference prune: %w", err) + } + + // Prepare the remaining queued reference updates. + if err := refUpdater.Prepare(); err != nil { + return false, fmt.Errorf("preparing reference update: %w", err) + } + + // Before committing the remaining reference updates, fetched objects must be migrated out of + // the quarantine directory. + if err := quarantineDir.Migrate(); err != nil { + return false, fmt.Errorf("migrating quarantined objects: %w", err) + } + + // Commit the remaining queued reference updates so the changes get applied. + if err := refUpdater.Commit(); err != nil { + return false, fmt.Errorf("committing reference update: %w", err) + } + + if req.GetCheckTagsChanged() { + return tagsChanged, nil + } + + // If the request does not specify to check if tags changed, return true as the default value. + return true, nil +} + +func (s *server) fetchRemote(ctx context.Context, req *gitalypb.FetchRemoteRequest) (bool, error) { + var stderr bytes.Buffer + opts := localrepo.FetchOpts{ + Stderr: &stderr, + Force: req.Force, + Prune: !req.NoPrune, + Tags: localrepo.FetchOptsTagsAll, + Verbose: req.GetCheckTagsChanged(), + DisableTransactions: true, + } + + if req.GetNoTags() { + opts.Tags = localrepo.FetchOptsTagsNone + } + + if err := buildCommandOpts(&opts, req); err != nil { + return false, err + } sshCommand, cleanup, err := git.BuildSSHInvocation(ctx, s.logger, req.GetSshKey(), req.GetKnownHosts()) if err != nil { - return nil, err + return false, err } defer cleanup() opts.Env = append(opts.Env, "GIT_SSH_COMMAND="+sshCommand) - if req.GetTimeout() > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(req.GetTimeout())*time.Second) - defer cancel() - } + repo := s.localrepo(req.GetRepository()) + remoteName := "inmemory" if err := repo.FetchRemote(ctx, remoteName, opts); err != nil { errMsg := stderr.String() if errMsg != "" { - return nil, structerr.NewInternal("fetch remote: %q: %w", errMsg, err) + return false, structerr.NewInternal("fetch remote: %q: %w", errMsg, err) } - return nil, structerr.NewInternal("fetch remote: %w", err) + return false, structerr.NewInternal("fetch remote: %w", err) } // Ideally, we'd do the voting process via git-fetch(1) using the reference-transaction @@ -116,15 +293,49 @@ func (s *server) FetchRemote(ctx context.Context, req *gitalypb.FetchRemoteReque return s.txManager.Vote(ctx, tx, vote, voting.UnknownPhase) }); err != nil { - return nil, structerr.NewAborted("failed vote on refs: %w", err) + return false, structerr.NewAborted("failed vote on refs: %w", err) } - out := &gitalypb.FetchRemoteResponse{TagsChanged: true} + tagsChanged := true if req.GetCheckTagsChanged() { - out.TagsChanged = didTagsChange(&stderr) + tagsChanged = didTagsChange(&stderr) + } + + return tagsChanged, nil +} + +func buildCommandOpts(opts *localrepo.FetchOpts, req *gitalypb.FetchRemoteRequest) error { + remoteURL := req.GetRemoteParams().GetUrl() + var config []git.ConfigPair + + for _, refspec := range getRefspecs(req.GetRemoteParams().GetMirrorRefmaps()) { + config = append(config, git.ConfigPair{ + Key: "remote.inmemory.fetch", Value: refspec, + }) } - return out, nil + if resolvedAddress := req.GetRemoteParams().GetResolvedAddress(); resolvedAddress != "" { + modifiedURL, resolveConfig, err := git.GetURLAndResolveConfig(remoteURL, resolvedAddress) + if err != nil { + return fmt.Errorf("couldn't get curloptResolve config: %w", err) + } + + remoteURL = modifiedURL + config = append(config, resolveConfig...) + } + + config = append(config, git.ConfigPair{Key: "remote.inmemory.url", Value: remoteURL}) + + if authHeader := req.GetRemoteParams().GetHttpAuthorizationHeader(); authHeader != "" { + config = append(config, git.ConfigPair{ + Key: fmt.Sprintf("http.%s.extraHeader", req.GetRemoteParams().GetUrl()), + Value: "Authorization: " + authHeader, + }) + } + + opts.CommandOptions = append(opts.CommandOptions, git.WithConfigEnv(config...)) + + return nil } func didTagsChange(r io.Reader) bool { @@ -160,7 +371,7 @@ func (s *server) validateFetchRemoteRequest(req *gitalypb.FetchRemoteRequest) er return nil } -func (s *server) getRefspecs(refmaps []string) []string { +func getRefspecs(refmaps []string) []string { if len(refmaps) == 0 { return []string{"refs/*:refs/*"} } diff --git a/internal/gitaly/service/repository/fetch_remote_test.go b/internal/gitaly/service/repository/fetch_remote_test.go index c62faf395..76fdb7ee4 100644 --- a/internal/gitaly/service/repository/fetch_remote_test.go +++ b/internal/gitaly/service/repository/fetch_remote_test.go @@ -2,6 +2,7 @@ package repository import ( "bytes" + "context" "fmt" "io" "net/http" @@ -11,6 +12,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" @@ -49,8 +51,11 @@ manually using update-ref with the fetch being just a dry-run. Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) t.Parallel() + testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemote) +} - ctx := testhelper.Context(t) +func testFetchRemote(t *testing.T, ctx context.Context) { + t.Parallel() // Some of the tests require multiple calls to the clients each run struct // encompasses the expected data for a single run @@ -258,7 +263,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) expectedRefs: map[string]git.ObjectID{ "refs/heads/branch/conflict": commitID, }, - expectedErr: structerr.NewInternal(`fetch remote: "error: cannot lock ref 'refs/heads/branch': 'refs/heads/branch/conflict' exists; cannot create 'refs/heads/branch'\nerror: some local refs could not be updated; try running\n 'git remote prune inmemory' to remove any old, conflicting branches\n": exit status 1`), + expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, + testhelper.WithInterceptedMetadataItems( + structerr.NewInternal("preparing reference update: file directory conflict"), + structerr.MetadataItem{Key: "conflicting_reference", Value: "refs/heads/branch"}, + structerr.MetadataItem{Key: "existing_reference", Value: "refs/heads/branch/conflict"}, + ), + structerr.NewInternal(`fetch remote: "error: cannot lock ref 'refs/heads/branch': 'refs/heads/branch/conflict' exists; cannot create 'refs/heads/branch'\nerror: some local refs could not be updated; try running\n 'git remote prune inmemory' to remove any old, conflicting branches\n": exit status 1`), + ), }, }, } @@ -316,7 +328,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) expectedRefs: map[string]git.ObjectID{ "refs/heads/branch": commitID, }, - expectedErr: structerr.NewInternal(`fetch remote: "error: cannot lock ref 'refs/heads/branch/conflict': 'refs/heads/branch' exists; cannot create 'refs/heads/branch/conflict'\nerror: some local refs could not be updated; try running\n 'git remote prune inmemory' to remove any old, conflicting branches\n": exit status 1`), + expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, + testhelper.WithInterceptedMetadataItems( + structerr.NewInternal("preparing reference update: file directory conflict"), + structerr.MetadataItem{Key: "conflicting_reference", Value: "refs/heads/branch/conflict"}, + structerr.MetadataItem{Key: "existing_reference", Value: "refs/heads/branch"}, + ), + structerr.NewInternal(`fetch remote: "error: cannot lock ref 'refs/heads/branch/conflict': 'refs/heads/branch' exists; cannot create 'refs/heads/branch/conflict'\nerror: some local refs could not be updated; try running\n 'git remote prune inmemory' to remove any old, conflicting branches\n": exit status 1`), + ), }, }, } @@ -448,7 +467,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) }, }, { - desc: "without force fails with diverging refs", + desc: "without force diverging refs not updated", setup: func(t *testing.T, cfg config.Cfg) setupData { _, remoteRepoPath := gittest.CreateRepository(t, ctx, cfg) repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) @@ -469,7 +488,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) runs: []run{ { expectedRefs: map[string]git.ObjectID{"refs/heads/master": commitID}, - expectedErr: structerr.NewInternal("fetch remote: exit status 1"), + expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, + &gitalypb.FetchRemoteResponse{TagsChanged: true}, + nil, + ), + expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, + nil, + error(structerr.NewInternal("fetch remote: exit status 1")), + ), }, }, } @@ -532,7 +558,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) "refs/heads/master": remoteCommitID, "refs/tags/v1": commitID, }, - expectedErr: structerr.NewInternal("fetch remote: exit status 1"), + expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, + &gitalypb.FetchRemoteResponse{TagsChanged: true}, + nil, + ), + expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, + nil, + error(structerr.NewInternal("fetch remote: exit status 1")), + ), }, }, } @@ -644,7 +677,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) "refs/heads/main": localDivergingID, "refs/heads/branch": remoteUpdatedID, }, - expectedErr: structerr.NewInternal("fetch remote: exit status 1"), + expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, + &gitalypb.FetchRemoteResponse{TagsChanged: true}, + nil, + ), + expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, + nil, + error(structerr.NewInternal("fetch remote: exit status 1")), + ), }, }, } @@ -681,7 +721,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) "refs/heads/main": localDivergingID, "refs/tags/v1.0.0": remoteTagID, }, - expectedErr: structerr.NewInternal("fetch remote: exit status 1"), + expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, + &gitalypb.FetchRemoteResponse{TagsChanged: true}, + nil, + ), + expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote, + nil, + error(structerr.NewInternal("fetch remote: exit status 1")), + ), }, }, } @@ -939,8 +986,12 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`) func TestFetchRemote_sshCommand(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemoteSSHCommand) +} + +func testFetchRemoteSSHCommand(t *testing.T, ctx context.Context) { + t.Parallel() - ctx := testhelper.Context(t) cfg := testcfg.Build(t) outputPath := filepath.Join(testhelper.TempDir(t), "output") @@ -1019,17 +1070,23 @@ func TestFetchRemote_sshCommand(t *testing.T) { func TestFetchRemote_transaction(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemoteTransaction) +} - ctx := testhelper.Context(t) +func testFetchRemoteTransaction(t *testing.T, ctx context.Context) { + t.Parallel() remoteCfg := testcfg.Build(t) _, remoteRepoPath := gittest.CreateRepository(t, ctx, remoteCfg, gittest.CreateRepositoryConfig{ SkipCreationViaService: true, }) + gittest.WriteCommit(t, remoteCfg, remoteRepoPath, gittest.WithBranch("foobar")) + targetGitCmdFactory := gittest.NewCommandFactory(t, remoteCfg) port := gittest.HTTPServer(t, ctx, targetGitCmdFactory, remoteRepoPath, nil) cfg := testcfg.Build(t) + testcfg.BuildGitalyHooks(t, cfg) txManager := transaction.NewTrackingManager() client, addr := runRepositoryService(t, cfg, testserver.WithTransactionManager(txManager)) cfg.SocketPath = addr @@ -1050,7 +1107,11 @@ func TestFetchRemote_transaction(t *testing.T) { }) require.NoError(t, err) - require.Equal(t, testhelper.GitalyOrPraefect(1, 3), len(txManager.Votes())) + if featureflag.AtomicFetchRemote.IsEnabled(ctx) { + require.Equal(t, testhelper.GitalyOrPraefect(2, 4), len(txManager.Votes())) + } else { + require.Equal(t, testhelper.GitalyOrPraefect(1, 3), len(txManager.Votes())) + } } func TestFetchRemote_pooledRepository(t *testing.T) { @@ -1058,8 +1119,11 @@ func TestFetchRemote_pooledRepository(t *testing.T) { Object pools are not yet supported with transaction management.`) t.Parallel() + testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemotePooledRepository) +} - ctx := testhelper.Context(t) +func testFetchRemotePooledRepository(t *testing.T, ctx context.Context) { + t.Parallel() // By default git-fetch(1) will always run with `core.alternateRefsCommand=exit 0 #`, which // effectively disables use of alternate refs. We can't just unset this value, so instead we |