Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-10-20 11:23:07 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-10-20 11:23:07 +0300
commit5f813c45f52e29875d3472dd119f78e1839910c8 (patch)
treedb0dcbdd4b4c090decf7a50d4b22e446cd792ae4
parent5bed5dbafee7fe74e2e7f9e0e4c7ccde6570eedf (diff)
parentd97213a565069016793801f1cd20b1edc45c6931 (diff)
Merge branch 'jt-atomic-fetch-remote' into 'master'
repository: Update references atomically in `FetchRemote` See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6432 Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: Justin Tobler <jtobler@gitlab.com> Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r--internal/featureflag/ff_atomic_fetch_remote.go9
-rw-r--r--internal/git/fetch_scanner.go102
-rw-r--r--internal/git/fetch_scanner_test.go184
-rw-r--r--internal/git/localrepo/remote.go17
-rw-r--r--internal/git/localrepo/remote_test.go47
-rw-r--r--internal/gitaly/service/repository/fetch_remote.go291
-rw-r--r--internal/gitaly/service/repository/fetch_remote_test.go88
7 files changed, 686 insertions, 52 deletions
diff --git a/internal/featureflag/ff_atomic_fetch_remote.go b/internal/featureflag/ff_atomic_fetch_remote.go
new file mode 100644
index 000000000..273b0c379
--- /dev/null
+++ b/internal/featureflag/ff_atomic_fetch_remote.go
@@ -0,0 +1,9 @@
+package featureflag
+
+// AtomicFetchRemote enables atomic transactions for the `FetchRemote` RPC.
+var AtomicFetchRemote = NewFeatureFlag(
+ "atomic_fetch_remote",
+ "v16.5.0",
+ "https://gitlab.com/gitlab-org/gitaly/-/issues/5640",
+ false,
+)
diff --git a/internal/git/fetch_scanner.go b/internal/git/fetch_scanner.go
index 4a546de43..2154a1f48 100644
--- a/internal/git/fetch_scanner.go
+++ b/internal/git/fetch_scanner.go
@@ -3,6 +3,8 @@ package git
import (
"bufio"
"bytes"
+ "errors"
+ "fmt"
"io"
"strings"
)
@@ -189,3 +191,103 @@ func parseFetchStatusLine(line []byte) (FetchStatusLine, bool) {
return out, true
}
+
+// FetchPorcelainStatusLine represents a line of status output from `git fetch` when the porcelain
+// option is enabled.
+type FetchPorcelainStatusLine struct {
+ Type RefUpdateType
+ OldOID ObjectID
+ NewOID ObjectID
+ Reference string
+}
+
+// FetchPorcelainScanner scans the output of `git fetch` when the porcelain option is enabled,
+// allowing information about references to be gathered.
+type FetchPorcelainScanner struct {
+ scanner *bufio.Scanner
+ lastLine FetchPorcelainStatusLine
+ hash ObjectHash
+ err error
+}
+
+// NewFetchPorcelainScanner returns a FetchPorcelainScanner.
+func NewFetchPorcelainScanner(r io.Reader, hash ObjectHash) *FetchPorcelainScanner {
+ return &FetchPorcelainScanner{scanner: bufio.NewScanner(r), hash: hash}
+}
+
+// Scan parses the next fetch status line from the fetch command output.
+func (f *FetchPorcelainScanner) Scan() bool {
+ if f.scanner.Scan() {
+ line, err := parseFetchPorcelainStatusLine(f.scanner.Bytes(), f.hash)
+ if err != nil {
+ f.err = err
+ return false
+ }
+
+ f.lastLine = line
+ return true
+ }
+
+ if f.scanner.Err() != nil {
+ f.err = f.scanner.Err()
+ }
+
+ return false
+}
+
+// Err returns any error encountered while scanning.
+func (f *FetchPorcelainScanner) Err() error {
+ return f.err
+}
+
+// StatusLine returns the status line information from the last scanned line.
+func (f *FetchPorcelainScanner) StatusLine() FetchPorcelainStatusLine {
+ return f.lastLine
+}
+
+// parseFetchPorcelainStatusLine parses a fetch status line.
+// The line format is as follows: <flag> <old-object-id> <new-object-id> <local-reference>
+// Each token is delimited by a single space and each line ends with a new-line. Further format
+// documentation can be found at https://git-scm.com/docs/git-fetch#_output.
+func parseFetchPorcelainStatusLine(line []byte, hash ObjectHash) (FetchPorcelainStatusLine, error) {
+ var status FetchPorcelainStatusLine
+
+ // Unfortunately, the reference update type must be checked and parsed first. This is because
+ // one of the possible values of the reference update type is <space>, which is also the same
+ // character used as the delimiter for the remaining tokens in the status line. Therefore, the
+ // line length is checked to make sure there will not be an out-of-bounds error when checking
+ // for the remaining tokens and the update reference type is parsed/validated.
+ if len(line) < 3 || line[1] != ' ' {
+ return FetchPorcelainStatusLine{}, errors.New("invalid status line")
+ }
+
+ // First character in the line represents the status of the reference.
+ status.Type = RefUpdateType(line[0])
+ if !status.Type.Valid() {
+ return FetchPorcelainStatusLine{}, fmt.Errorf("invalid reference update type: %q", status.Type)
+ }
+
+ // The remaining tokens in the status line can be safely split by the <space> delimiter.
+ refStatus := bytes.Split(line[2:], []byte(" "))
+ if len(refStatus) != 3 {
+ return FetchPorcelainStatusLine{}, errors.New("invalid status line")
+ }
+
+ var err error
+ status.OldOID, err = hash.FromHex(string(refStatus[0]))
+ if err != nil {
+ return FetchPorcelainStatusLine{}, fmt.Errorf("constructing old OID: %w", err)
+ }
+
+ status.NewOID, err = hash.FromHex(string(refStatus[1]))
+ if err != nil {
+ return FetchPorcelainStatusLine{}, fmt.Errorf("constructing new OID: %w", err)
+ }
+
+ status.Reference = string(refStatus[2])
+ if err := ValidateReference(status.Reference); err != nil {
+ return FetchPorcelainStatusLine{}, fmt.Errorf("validating reference: %w", err)
+ }
+
+ return status, nil
+}
diff --git a/internal/git/fetch_scanner_test.go b/internal/git/fetch_scanner_test.go
index d49feda93..d343f5587 100644
--- a/internal/git/fetch_scanner_test.go
+++ b/internal/git/fetch_scanner_test.go
@@ -1,6 +1,7 @@
package git
import (
+ "errors"
"strings"
"testing"
@@ -8,6 +9,8 @@ import (
)
func TestFetchScannerScan(t *testing.T) {
+ t.Parallel()
+
blank := FetchStatusLine{}
for _, tc := range []struct {
@@ -122,7 +125,10 @@ func TestFetchScannerScan(t *testing.T) {
success: true,
},
} {
+ tc := tc
t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
scanner := NewFetchScanner(strings.NewReader(tc.data))
require.Equal(t, tc.success, scanner.Scan())
require.Equal(t, tc.expected, scanner.StatusLine())
@@ -131,3 +137,181 @@ func TestFetchScannerScan(t *testing.T) {
})
}
}
+
+func TestFetchPorcelainScannerScan(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct {
+ desc string
+ data string
+ hash ObjectHash
+ expectedStatus FetchPorcelainStatusLine
+ expectedError error
+ success bool
+ }{
+ {
+ desc: "empty line",
+ data: "",
+ expectedStatus: FetchPorcelainStatusLine{},
+ },
+ {
+ desc: "blank line",
+ data: " ",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{},
+ expectedError: errors.New("invalid status line"),
+ },
+ {
+ desc: "invalid flag",
+ data: "? 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{},
+ expectedError: errors.New("invalid reference update type: '?'"),
+ },
+ {
+ desc: "valid fast-forward status",
+ data: " 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{
+ Type: RefUpdateTypeFastForwardUpdate,
+ OldOID: ObjectID("0000000000000000000000000000000000000000"),
+ NewOID: ObjectID("0000000000000000000000000000000000000001"),
+ Reference: "refs/heads/main",
+ },
+ success: true,
+ },
+ {
+ desc: "valid forced status",
+ data: "+ 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{
+ Type: RefUpdateTypeForcedUpdate,
+ OldOID: ObjectID("0000000000000000000000000000000000000000"),
+ NewOID: ObjectID("0000000000000000000000000000000000000001"),
+ Reference: "refs/heads/main",
+ },
+ success: true,
+ },
+ {
+ desc: "valid pruned status",
+ data: "- 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{
+ Type: RefUpdateTypePruned,
+ OldOID: ObjectID("0000000000000000000000000000000000000000"),
+ NewOID: ObjectID("0000000000000000000000000000000000000001"),
+ Reference: "refs/heads/main",
+ },
+ success: true,
+ },
+ {
+ desc: "valid tag status",
+ data: "t 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{
+ Type: RefUpdateTypeTagUpdate,
+ OldOID: ObjectID("0000000000000000000000000000000000000000"),
+ NewOID: ObjectID("0000000000000000000000000000000000000001"),
+ Reference: "refs/heads/main",
+ },
+ success: true,
+ },
+ {
+ desc: "valid fetched status",
+ data: "* 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{
+ Type: RefUpdateTypeFetched,
+ OldOID: ObjectID("0000000000000000000000000000000000000000"),
+ NewOID: ObjectID("0000000000000000000000000000000000000001"),
+ Reference: "refs/heads/main",
+ },
+ success: true,
+ },
+ {
+ desc: "valid rejected status",
+ data: "! 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{
+ Type: RefUpdateTypeUpdateFailed,
+ OldOID: ObjectID("0000000000000000000000000000000000000000"),
+ NewOID: ObjectID("0000000000000000000000000000000000000001"),
+ Reference: "refs/heads/main",
+ },
+ success: true,
+ },
+ {
+ desc: "valid up-to-date status",
+ data: "= 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{
+ Type: RefUpdateTypeUnchanged,
+ OldOID: ObjectID("0000000000000000000000000000000000000000"),
+ NewOID: ObjectID("0000000000000000000000000000000000000001"),
+ Reference: "refs/heads/main",
+ },
+ success: true,
+ },
+ {
+ desc: "invalid sha1 old OID",
+ data: " 0 0000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{},
+ expectedError: errors.New("constructing old OID: invalid object ID: \"0\", expected length 40, got 1"),
+ },
+ {
+ desc: "invalid sha1 new OID",
+ data: " 0000000000000000000000000000000000000000 1 refs/heads/main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{},
+ expectedError: errors.New("constructing new OID: invalid object ID: \"1\", expected length 40, got 1"),
+ },
+ {
+ desc: "valid sha256 status",
+ data: " 0000000000000000000000000000000000000000000000000000000000000000 0000000000000000000000000000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA256,
+ expectedStatus: FetchPorcelainStatusLine{
+ Type: RefUpdateTypeFastForwardUpdate,
+ OldOID: ObjectID("0000000000000000000000000000000000000000000000000000000000000000"),
+ NewOID: ObjectID("0000000000000000000000000000000000000000000000000000000000000001"),
+ Reference: "refs/heads/main",
+ },
+ success: true,
+ },
+ {
+ desc: "invalid sha256 old OID",
+ data: " 0 0000000000000000000000000000000000000000000000000000000000000001 refs/heads/main",
+ hash: ObjectHashSHA256,
+ expectedStatus: FetchPorcelainStatusLine{},
+ expectedError: errors.New("constructing old OID: invalid object ID: \"0\", expected length 64, got 1"),
+ },
+ {
+ desc: "invalid sha256 new OID",
+ data: " 0000000000000000000000000000000000000000000000000000000000000000 1 refs/heads/main",
+ hash: ObjectHashSHA256,
+ expectedStatus: FetchPorcelainStatusLine{},
+ expectedError: errors.New("constructing new OID: invalid object ID: \"1\", expected length 64, got 1"),
+ },
+ {
+ desc: "invalid reference",
+ data: " 0000000000000000000000000000000000000000 0000000000000000000000000000000000000001 main",
+ hash: ObjectHashSHA1,
+ expectedStatus: FetchPorcelainStatusLine{},
+ expectedError: errors.New("validating reference: reference is not fully qualified"),
+ },
+ } {
+ tc := tc
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ scanner := NewFetchPorcelainScanner(strings.NewReader(tc.data), tc.hash)
+ require.Equal(t, tc.success, scanner.Scan())
+ require.Equal(t, tc.expectedStatus, scanner.StatusLine())
+ if tc.expectedError != nil {
+ require.EqualError(t, scanner.err, tc.expectedError.Error())
+ } else {
+ require.NoError(t, tc.expectedError)
+ }
+ })
+ }
+}
diff --git a/internal/git/localrepo/remote.go b/internal/git/localrepo/remote.go
index d6b6660c9..696512afb 100644
--- a/internal/git/localrepo/remote.go
+++ b/internal/git/localrepo/remote.go
@@ -50,6 +50,14 @@ type FetchOpts struct {
// https://git-scm.com/docs/git-fetch#Documentation/git-fetch.txt---tags
// https://git-scm.com/docs/git-fetch#Documentation/git-fetch.txt---no-tags
Tags FetchOptsTags
+ // DryRun, if enabled, performs the `git-fetch(1)` command without updating any references.
+ DryRun bool
+ // Porcelain controls `git-fetch(1)` command output and when enabled prints output in an
+ // easy-to-parse format. By default, `git-fetch(1)` output is suppressed by the `--quiet` flag.
+ // Therefore, the Verbose option must also be enabled to receive output.
+ Porcelain bool
+ // Stdout if set it would be used to redirect stdout stream into it.
+ Stdout io.Writer
// Stderr if set it would be used to redirect stderr stream into it.
Stderr io.Writer
// DisableTransactions will disable the reference-transaction hook and atomic transactions.
@@ -80,6 +88,7 @@ func (repo *Repo) FetchRemote(ctx context.Context, remoteName string, opts Fetch
commandOptions := []git.CmdOpt{
git.WithEnv(opts.Env...),
+ git.WithStdout(opts.Stdout),
git.WithStderr(opts.Stderr),
git.WithConfig(git.ConfigPair{
// Git is so kind to point out that we asked it to not show forced updates
@@ -198,6 +207,14 @@ func (opts FetchOpts) buildFlags() []git.Option {
flags = append(flags, git.Flag{Name: "--atomic"})
}
+ if opts.DryRun {
+ flags = append(flags, git.Flag{Name: "--dry-run"})
+ }
+
+ if opts.Porcelain {
+ flags = append(flags, git.Flag{Name: "--porcelain"})
+ }
+
// Even if we ask Git to not print any output and to force-update branches it will still
// compute whether branches have been force-updated only to discard that information again.
// Let's ask it not to given that this check can be quite expensive.
diff --git a/internal/git/localrepo/remote_test.go b/internal/git/localrepo/remote_test.go
index c27c56fef..138428253 100644
--- a/internal/git/localrepo/remote_test.go
+++ b/internal/git/localrepo/remote_test.go
@@ -172,6 +172,53 @@ func TestRepo_FetchRemote(t *testing.T) {
require.False(t, contains, "remote tracking branch should be pruned as it no longer exists on the remote")
})
+ t.Run("with dry-run", func(t *testing.T) {
+ repo, _ := initBareWithRemote(t, "origin")
+
+ var stderr bytes.Buffer
+ require.NoError(t, repo.FetchRemote(ctx, "origin", FetchOpts{Stderr: &stderr, DryRun: true}))
+
+ require.Empty(t, stderr.String(), "it should not produce output as it is called with --quiet flag by default")
+
+ // Prior to the fetch, the repository did not contain any references. Consequently, we do
+ // not expect the repository to contain any references because the fetch performed was a
+ // dry-run.
+ refs, err := repo.GetReferences(ctx)
+ require.NoError(t, err)
+ require.Len(t, refs, 0)
+ })
+
+ t.Run("with porcelain", func(t *testing.T) {
+ repo, _ := initBareWithRemote(t, "origin")
+
+ // The porcelain fetch option write output to stdout in an easy-to-parse format. By default,
+ // output is suppressed by the --quiet flag. The Verbose option must also be enabled to
+ // receive output.
+ var stdout bytes.Buffer
+ require.NoError(t, repo.FetchRemote(ctx, "origin", FetchOpts{
+ Stdout: &stdout,
+ Porcelain: true,
+ Verbose: true,
+ }))
+
+ hash, err := repo.ObjectHash(ctx)
+ require.NoError(t, err)
+ scanner := git.NewFetchPorcelainScanner(&stdout, hash)
+
+ // Scan the output for expected references.
+ require.True(t, scanner.Scan())
+ require.Equal(t, git.RefUpdateTypeFetched, scanner.StatusLine().Type)
+ require.Equal(t, "refs/remotes/origin/main", scanner.StatusLine().Reference)
+ require.True(t, scanner.Scan())
+ require.Equal(t, git.RefUpdateTypeFetched, scanner.StatusLine().Type)
+ require.Equal(t, "refs/tags/v1.0.0", scanner.StatusLine().Reference)
+
+ // Since the remote only contains two references, there should be nothing left in the buffer
+ // to scan.
+ require.False(t, scanner.Scan())
+ require.Nil(t, scanner.Err())
+ })
+
t.Run("with no tags", func(t *testing.T) {
repo, testRepoPath := initBareWithRemote(t, "origin")
diff --git a/internal/gitaly/service/repository/fetch_remote.go b/internal/gitaly/service/repository/fetch_remote.go
index ae041e265..480001477 100644
--- a/internal/gitaly/service/repository/fetch_remote.go
+++ b/internal/gitaly/service/repository/fetch_remote.go
@@ -5,10 +5,14 @@ import (
"context"
"fmt"
"io"
+ "strings"
"time"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/transaction/txinfo"
@@ -21,73 +25,246 @@ func (s *server) FetchRemote(ctx context.Context, req *gitalypb.FetchRemoteReque
return nil, err
}
- var stderr bytes.Buffer
+ if req.GetTimeout() > 0 {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, time.Duration(req.GetTimeout())*time.Second)
+ defer cancel()
+ }
+
+ var tagsChanged bool
+ var err error
+ if featureflag.AtomicFetchRemote.IsEnabled(ctx) {
+ tagsChanged, err = s.fetchRemoteAtomic(ctx, req)
+ } else {
+ tagsChanged, err = s.fetchRemote(ctx, req)
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ return &gitalypb.FetchRemoteResponse{TagsChanged: tagsChanged}, nil
+}
+
+// fetchRemoteAtomic fetches changes from the specified remote repository. To be atomic, fetched
+// objects are first quarantined and only migrated before committing the reference transaction.
+func (s *server) fetchRemoteAtomic(ctx context.Context, req *gitalypb.FetchRemoteRequest) (_ bool, returnedErr error) {
+ var stdout, stderr bytes.Buffer
opts := localrepo.FetchOpts{
- Stderr: &stderr,
- Force: req.Force,
- Prune: !req.NoPrune,
- Tags: localrepo.FetchOptsTagsAll,
- Verbose: req.GetCheckTagsChanged(),
+ Stdout: &stdout,
+ Stderr: &stderr,
+ Force: req.Force,
+ Prune: !req.NoPrune,
+ Tags: localrepo.FetchOptsTagsAll,
+ Verbose: true,
+ // Transactions are disabled during fetch operation because no references are updated when
+ // the dry-run option is enabled. Instead, the reference-transaction hook is performed
+ // during the subsequent execution of `git-update-ref(1)`.
DisableTransactions: true,
+ // When the `dry-run` option is used with `git-fetch(1)`, Git objects are received without
+ // performing reference updates. This is used to quarantine objects on the initial fetch and
+ // migration to occur only during reference update.
+ DryRun: true,
+ // The `porcelain` option outputs reference update information from `git-fetch(1) to stdout.
+ // Since references are not updated during a `git-fetch(1)` dry-run, the reference
+ // information is used during `git-update-ref(1)` execution to update the appropriate
+ // corresponding references.
+ Porcelain: true,
}
if req.GetNoTags() {
opts.Tags = localrepo.FetchOptsTagsNone
}
- repo := s.localrepo(req.GetRepository())
- remoteName := "inmemory"
- remoteURL := req.GetRemoteParams().GetUrl()
- var config []git.ConfigPair
+ if err := buildCommandOpts(&opts, req); err != nil {
+ return false, err
+ }
- for _, refspec := range s.getRefspecs(req.GetRemoteParams().GetMirrorRefmaps()) {
- config = append(config, git.ConfigPair{
- Key: "remote.inmemory.fetch", Value: refspec,
- })
+ sshCommand, cleanup, err := git.BuildSSHInvocation(ctx, s.logger, req.GetSshKey(), req.GetKnownHosts())
+ if err != nil {
+ return false, err
}
+ defer cleanup()
- if resolvedAddress := req.GetRemoteParams().GetResolvedAddress(); resolvedAddress != "" {
- modifiedURL, resolveConfig, err := git.GetURLAndResolveConfig(remoteURL, resolvedAddress)
- if err != nil {
- return nil, fmt.Errorf("couldn't get curloptResolve config: %w", err)
+ opts.Env = append(opts.Env, "GIT_SSH_COMMAND="+sshCommand)
+
+ // When performing fetch, objects are received before references are updated. If references fail
+ // to be updated, unreachable objects could be left in the repository that would need to be
+ // garbage collected. To be more atomic, a quarantine directory is set up where objects will be
+ // fetched prior to being migrated to the main repository when reference updates are committed.
+ quarantineDir, err := quarantine.New(ctx, req.GetRepository(), s.logger, s.locator)
+ if err != nil {
+ return false, fmt.Errorf("creating quarantine directory: %w", err)
+ }
+
+ quarantineRepo := s.localrepo(quarantineDir.QuarantinedRepo())
+ if err := quarantineRepo.FetchRemote(ctx, "inmemory", opts); err != nil {
+ // When `git-fetch(1)` fails to apply all reference updates successfully, the command
+ // returns `exit status 1`. Despite this error, successful reference updates should still be
+ // applied during the subsequent `git-update-ref(1)`. To differentiate between regular
+ // errors and failed reference updates, stderr is checked for an error message. If an error
+ // message is present, it is determined that an error occurred and the operation halts.
+ errMsg := stderr.String()
+ if errMsg != "" {
+ return false, structerr.NewInternal("fetch remote: %q: %w", errMsg, err)
}
- remoteURL = modifiedURL
- config = append(config, resolveConfig...)
+ // Some errors during the `git-fetch(1)` operation do not print to stderr. If the error
+ // message is not `exit status 1`, it is determined that the error is unrelated to failed
+ // reference updates and the operation halts. Otherwise, it is assumed the error is from a
+ // failed reference update and the operation proceeds to update references.
+ if err.Error() != "exit status 1" {
+ return false, structerr.NewInternal("fetch remote: %w", err)
+ }
}
- config = append(config, git.ConfigPair{Key: "remote.inmemory.url", Value: remoteURL})
+ // A repository cannot contain references with F/D (file/directory) conflicts (i.e.
+ // `refs/heads/foo` and `refs/heads/foo/bar`). If fetching from the remote repository
+ // results in an F/D conflict, the reference update fails. In some cases a conflicting
+ // reference may exist locally that does not exist on the remote. In this scenario, if
+ // outdated references are first pruned locally, the F/D conflict can be avoided. When
+ // `git-fetch(1)` is performed with the `--prune` and `--dry-run` flags, the pruned
+ // references are also included in the output without performing any actual reference
+ // updates. Bulk atomic reference updates performed by `git-update-ref(1)` do not support
+ // F/D conflicts even if the conflicted reference is being pruned. Therefore, pruned
+ // references must be updated first in a separate transaction. To accommodate this, two
+ // different instances of `updateref.Updater` are used to keep the transactions separate.
+ prunedUpdater, err := updateref.New(ctx, quarantineRepo)
+ if err != nil {
+ return false, fmt.Errorf("spawning pruned updater: %w", err)
+ }
+ defer func() {
+ if err := prunedUpdater.Close(); err != nil && returnedErr == nil {
+ returnedErr = fmt.Errorf("cancel pruned updater: %w", err)
+ }
+ }()
- if authHeader := req.GetRemoteParams().GetHttpAuthorizationHeader(); authHeader != "" {
- config = append(config, git.ConfigPair{
- Key: fmt.Sprintf("http.%s.extraHeader", req.GetRemoteParams().GetUrl()),
- Value: "Authorization: " + authHeader,
- })
+ // All other reference updates can be queued as part of the same transaction.
+ refUpdater, err := updateref.New(ctx, quarantineRepo)
+ if err != nil {
+ return false, fmt.Errorf("spawning ref updater: %w", err)
}
+ defer func() {
+ if err := refUpdater.Close(); err != nil && returnedErr == nil {
+ returnedErr = fmt.Errorf("cancel ref updater: %w", err)
+ }
+ }()
- opts.CommandOptions = append(opts.CommandOptions, git.WithConfigEnv(config...))
+ if err := prunedUpdater.Start(); err != nil {
+ return false, fmt.Errorf("start reference transaction: %w", err)
+ }
+
+ if err := refUpdater.Start(); err != nil {
+ return false, fmt.Errorf("start reference transaction: %w", err)
+ }
+
+ objectHash, err := quarantineRepo.ObjectHash(ctx)
+ if err != nil {
+ return false, fmt.Errorf("detecting object hash: %w", err)
+ }
+
+ var tagsChanged bool
+
+ // Parse stdout to identify required reference updates. Reference updates are queued to the
+ // respective updater based on type.
+ scanner := git.NewFetchPorcelainScanner(&stdout, objectHash)
+ for scanner.Scan() {
+ status := scanner.StatusLine()
+
+ switch status.Type {
+ // Failed and unchanged reference updates do not need to be applied.
+ case git.RefUpdateTypeUpdateFailed, git.RefUpdateTypeUnchanged:
+ // Queue pruned references in a separate transaction to avoid F/D conflicts.
+ case git.RefUpdateTypePruned:
+ if err := prunedUpdater.Delete(git.ReferenceName(status.Reference)); err != nil {
+ return false, fmt.Errorf("queueing pruned ref for deletion: %w", err)
+ }
+ // Queue all other reference updates in the same transaction.
+ default:
+ if err := refUpdater.Update(git.ReferenceName(status.Reference), status.NewOID, status.OldOID); err != nil {
+ return false, fmt.Errorf("queueing ref to be updated: %w", err)
+ }
+
+ // While scanning reference updates, check if any tags changed.
+ if status.Type == git.RefUpdateTypeTagUpdate || (status.Type == git.RefUpdateTypeFetched && strings.HasPrefix(status.Reference, "refs/tags")) {
+ tagsChanged = true
+ }
+ }
+ }
+ if scanner.Err() != nil {
+ return false, fmt.Errorf("scanning fetch output: %w", scanner.Err())
+ }
+
+ // Prepare pruned references in separate transaction to avoid F/D conflicts.
+ if err := prunedUpdater.Prepare(); err != nil {
+ return false, fmt.Errorf("preparing reference prune: %w", err)
+ }
+
+ // Commit pruned references to complete transaction and apply changes.
+ if err := prunedUpdater.Commit(); err != nil {
+ return false, fmt.Errorf("committing reference prune: %w", err)
+ }
+
+ // Prepare the remaining queued reference updates.
+ if err := refUpdater.Prepare(); err != nil {
+ return false, fmt.Errorf("preparing reference update: %w", err)
+ }
+
+ // Before committing the remaining reference updates, fetched objects must be migrated out of
+ // the quarantine directory.
+ if err := quarantineDir.Migrate(); err != nil {
+ return false, fmt.Errorf("migrating quarantined objects: %w", err)
+ }
+
+ // Commit the remaining queued reference updates so the changes get applied.
+ if err := refUpdater.Commit(); err != nil {
+ return false, fmt.Errorf("committing reference update: %w", err)
+ }
+
+ if req.GetCheckTagsChanged() {
+ return tagsChanged, nil
+ }
+
+ // If the request does not specify to check if tags changed, return true as the default value.
+ return true, nil
+}
+
+func (s *server) fetchRemote(ctx context.Context, req *gitalypb.FetchRemoteRequest) (bool, error) {
+ var stderr bytes.Buffer
+ opts := localrepo.FetchOpts{
+ Stderr: &stderr,
+ Force: req.Force,
+ Prune: !req.NoPrune,
+ Tags: localrepo.FetchOptsTagsAll,
+ Verbose: req.GetCheckTagsChanged(),
+ DisableTransactions: true,
+ }
+
+ if req.GetNoTags() {
+ opts.Tags = localrepo.FetchOptsTagsNone
+ }
+
+ if err := buildCommandOpts(&opts, req); err != nil {
+ return false, err
+ }
sshCommand, cleanup, err := git.BuildSSHInvocation(ctx, s.logger, req.GetSshKey(), req.GetKnownHosts())
if err != nil {
- return nil, err
+ return false, err
}
defer cleanup()
opts.Env = append(opts.Env, "GIT_SSH_COMMAND="+sshCommand)
- if req.GetTimeout() > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, time.Duration(req.GetTimeout())*time.Second)
- defer cancel()
- }
+ repo := s.localrepo(req.GetRepository())
+ remoteName := "inmemory"
if err := repo.FetchRemote(ctx, remoteName, opts); err != nil {
errMsg := stderr.String()
if errMsg != "" {
- return nil, structerr.NewInternal("fetch remote: %q: %w", errMsg, err)
+ return false, structerr.NewInternal("fetch remote: %q: %w", errMsg, err)
}
- return nil, structerr.NewInternal("fetch remote: %w", err)
+ return false, structerr.NewInternal("fetch remote: %w", err)
}
// Ideally, we'd do the voting process via git-fetch(1) using the reference-transaction
@@ -116,15 +293,49 @@ func (s *server) FetchRemote(ctx context.Context, req *gitalypb.FetchRemoteReque
return s.txManager.Vote(ctx, tx, vote, voting.UnknownPhase)
}); err != nil {
- return nil, structerr.NewAborted("failed vote on refs: %w", err)
+ return false, structerr.NewAborted("failed vote on refs: %w", err)
}
- out := &gitalypb.FetchRemoteResponse{TagsChanged: true}
+ tagsChanged := true
if req.GetCheckTagsChanged() {
- out.TagsChanged = didTagsChange(&stderr)
+ tagsChanged = didTagsChange(&stderr)
+ }
+
+ return tagsChanged, nil
+}
+
+func buildCommandOpts(opts *localrepo.FetchOpts, req *gitalypb.FetchRemoteRequest) error {
+ remoteURL := req.GetRemoteParams().GetUrl()
+ var config []git.ConfigPair
+
+ for _, refspec := range getRefspecs(req.GetRemoteParams().GetMirrorRefmaps()) {
+ config = append(config, git.ConfigPair{
+ Key: "remote.inmemory.fetch", Value: refspec,
+ })
}
- return out, nil
+ if resolvedAddress := req.GetRemoteParams().GetResolvedAddress(); resolvedAddress != "" {
+ modifiedURL, resolveConfig, err := git.GetURLAndResolveConfig(remoteURL, resolvedAddress)
+ if err != nil {
+ return fmt.Errorf("couldn't get curloptResolve config: %w", err)
+ }
+
+ remoteURL = modifiedURL
+ config = append(config, resolveConfig...)
+ }
+
+ config = append(config, git.ConfigPair{Key: "remote.inmemory.url", Value: remoteURL})
+
+ if authHeader := req.GetRemoteParams().GetHttpAuthorizationHeader(); authHeader != "" {
+ config = append(config, git.ConfigPair{
+ Key: fmt.Sprintf("http.%s.extraHeader", req.GetRemoteParams().GetUrl()),
+ Value: "Authorization: " + authHeader,
+ })
+ }
+
+ opts.CommandOptions = append(opts.CommandOptions, git.WithConfigEnv(config...))
+
+ return nil
}
func didTagsChange(r io.Reader) bool {
@@ -160,7 +371,7 @@ func (s *server) validateFetchRemoteRequest(req *gitalypb.FetchRemoteRequest) er
return nil
}
-func (s *server) getRefspecs(refmaps []string) []string {
+func getRefspecs(refmaps []string) []string {
if len(refmaps) == 0 {
return []string{"refs/*:refs/*"}
}
diff --git a/internal/gitaly/service/repository/fetch_remote_test.go b/internal/gitaly/service/repository/fetch_remote_test.go
index c62faf395..76fdb7ee4 100644
--- a/internal/gitaly/service/repository/fetch_remote_test.go
+++ b/internal/gitaly/service/repository/fetch_remote_test.go
@@ -2,6 +2,7 @@ package repository
import (
"bytes"
+ "context"
"fmt"
"io"
"net/http"
@@ -11,6 +12,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
@@ -49,8 +51,11 @@ manually using update-ref with the fetch being just a dry-run.
Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`)
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemote)
+}
- ctx := testhelper.Context(t)
+func testFetchRemote(t *testing.T, ctx context.Context) {
+ t.Parallel()
// Some of the tests require multiple calls to the clients each run struct
// encompasses the expected data for a single run
@@ -258,7 +263,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`)
expectedRefs: map[string]git.ObjectID{
"refs/heads/branch/conflict": commitID,
},
- expectedErr: structerr.NewInternal(`fetch remote: "error: cannot lock ref 'refs/heads/branch': 'refs/heads/branch/conflict' exists; cannot create 'refs/heads/branch'\nerror: some local refs could not be updated; try running\n 'git remote prune inmemory' to remove any old, conflicting branches\n": exit status 1`),
+ expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote,
+ testhelper.WithInterceptedMetadataItems(
+ structerr.NewInternal("preparing reference update: file directory conflict"),
+ structerr.MetadataItem{Key: "conflicting_reference", Value: "refs/heads/branch"},
+ structerr.MetadataItem{Key: "existing_reference", Value: "refs/heads/branch/conflict"},
+ ),
+ structerr.NewInternal(`fetch remote: "error: cannot lock ref 'refs/heads/branch': 'refs/heads/branch/conflict' exists; cannot create 'refs/heads/branch'\nerror: some local refs could not be updated; try running\n 'git remote prune inmemory' to remove any old, conflicting branches\n": exit status 1`),
+ ),
},
},
}
@@ -316,7 +328,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`)
expectedRefs: map[string]git.ObjectID{
"refs/heads/branch": commitID,
},
- expectedErr: structerr.NewInternal(`fetch remote: "error: cannot lock ref 'refs/heads/branch/conflict': 'refs/heads/branch' exists; cannot create 'refs/heads/branch/conflict'\nerror: some local refs could not be updated; try running\n 'git remote prune inmemory' to remove any old, conflicting branches\n": exit status 1`),
+ expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote,
+ testhelper.WithInterceptedMetadataItems(
+ structerr.NewInternal("preparing reference update: file directory conflict"),
+ structerr.MetadataItem{Key: "conflicting_reference", Value: "refs/heads/branch/conflict"},
+ structerr.MetadataItem{Key: "existing_reference", Value: "refs/heads/branch"},
+ ),
+ structerr.NewInternal(`fetch remote: "error: cannot lock ref 'refs/heads/branch/conflict': 'refs/heads/branch' exists; cannot create 'refs/heads/branch/conflict'\nerror: some local refs could not be updated; try running\n 'git remote prune inmemory' to remove any old, conflicting branches\n": exit status 1`),
+ ),
},
},
}
@@ -448,7 +467,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`)
},
},
{
- desc: "without force fails with diverging refs",
+ desc: "without force diverging refs not updated",
setup: func(t *testing.T, cfg config.Cfg) setupData {
_, remoteRepoPath := gittest.CreateRepository(t, ctx, cfg)
repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg)
@@ -469,7 +488,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`)
runs: []run{
{
expectedRefs: map[string]git.ObjectID{"refs/heads/master": commitID},
- expectedErr: structerr.NewInternal("fetch remote: exit status 1"),
+ expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote,
+ &gitalypb.FetchRemoteResponse{TagsChanged: true},
+ nil,
+ ),
+ expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote,
+ nil,
+ error(structerr.NewInternal("fetch remote: exit status 1")),
+ ),
},
},
}
@@ -532,7 +558,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`)
"refs/heads/master": remoteCommitID,
"refs/tags/v1": commitID,
},
- expectedErr: structerr.NewInternal("fetch remote: exit status 1"),
+ expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote,
+ &gitalypb.FetchRemoteResponse{TagsChanged: true},
+ nil,
+ ),
+ expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote,
+ nil,
+ error(structerr.NewInternal("fetch remote: exit status 1")),
+ ),
},
},
}
@@ -644,7 +677,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`)
"refs/heads/main": localDivergingID,
"refs/heads/branch": remoteUpdatedID,
},
- expectedErr: structerr.NewInternal("fetch remote: exit status 1"),
+ expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote,
+ &gitalypb.FetchRemoteResponse{TagsChanged: true},
+ nil,
+ ),
+ expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote,
+ nil,
+ error(structerr.NewInternal("fetch remote: exit status 1")),
+ ),
},
},
}
@@ -681,7 +721,14 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`)
"refs/heads/main": localDivergingID,
"refs/tags/v1.0.0": remoteTagID,
},
- expectedErr: structerr.NewInternal("fetch remote: exit status 1"),
+ expectedResponse: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote,
+ &gitalypb.FetchRemoteResponse{TagsChanged: true},
+ nil,
+ ),
+ expectedErr: testhelper.EnabledOrDisabledFlag(ctx, featureflag.AtomicFetchRemote,
+ nil,
+ error(structerr.NewInternal("fetch remote: exit status 1")),
+ ),
},
},
}
@@ -939,8 +986,12 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3780`)
func TestFetchRemote_sshCommand(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemoteSSHCommand)
+}
+
+func testFetchRemoteSSHCommand(t *testing.T, ctx context.Context) {
+ t.Parallel()
- ctx := testhelper.Context(t)
cfg := testcfg.Build(t)
outputPath := filepath.Join(testhelper.TempDir(t), "output")
@@ -1019,17 +1070,23 @@ func TestFetchRemote_sshCommand(t *testing.T) {
func TestFetchRemote_transaction(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemoteTransaction)
+}
- ctx := testhelper.Context(t)
+func testFetchRemoteTransaction(t *testing.T, ctx context.Context) {
+ t.Parallel()
remoteCfg := testcfg.Build(t)
_, remoteRepoPath := gittest.CreateRepository(t, ctx, remoteCfg, gittest.CreateRepositoryConfig{
SkipCreationViaService: true,
})
+ gittest.WriteCommit(t, remoteCfg, remoteRepoPath, gittest.WithBranch("foobar"))
+
targetGitCmdFactory := gittest.NewCommandFactory(t, remoteCfg)
port := gittest.HTTPServer(t, ctx, targetGitCmdFactory, remoteRepoPath, nil)
cfg := testcfg.Build(t)
+ testcfg.BuildGitalyHooks(t, cfg)
txManager := transaction.NewTrackingManager()
client, addr := runRepositoryService(t, cfg, testserver.WithTransactionManager(txManager))
cfg.SocketPath = addr
@@ -1050,7 +1107,11 @@ func TestFetchRemote_transaction(t *testing.T) {
})
require.NoError(t, err)
- require.Equal(t, testhelper.GitalyOrPraefect(1, 3), len(txManager.Votes()))
+ if featureflag.AtomicFetchRemote.IsEnabled(ctx) {
+ require.Equal(t, testhelper.GitalyOrPraefect(2, 4), len(txManager.Votes()))
+ } else {
+ require.Equal(t, testhelper.GitalyOrPraefect(1, 3), len(txManager.Votes()))
+ }
}
func TestFetchRemote_pooledRepository(t *testing.T) {
@@ -1058,8 +1119,11 @@ func TestFetchRemote_pooledRepository(t *testing.T) {
Object pools are not yet supported with transaction management.`)
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.AtomicFetchRemote).Run(t, testFetchRemotePooledRepository)
+}
- ctx := testhelper.Context(t)
+func testFetchRemotePooledRepository(t *testing.T, ctx context.Context) {
+ t.Parallel()
// By default git-fetch(1) will always run with `core.alternateRefsCommand=exit 0 #`, which
// effectively disables use of alternate refs. We can't just unset this value, so instead we