Welcome to mirror list, hosted at ThFree Co, Russian Federation.

fetch_remote.go « repository « service « gitaly « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: ca7de9fe640aded54b168c34abeeea135c62dcef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package repository

import (
	"bytes"
	"context"
	"fmt"
	"strings"
	"time"

	"gitlab.com/gitlab-org/gitaly/v16/internal/git"
	"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
	"gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine"
	"gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref"
	"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
	"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)

func (s *server) FetchRemote(ctx context.Context, req *gitalypb.FetchRemoteRequest) (*gitalypb.FetchRemoteResponse, error) {
	if err := s.validateFetchRemoteRequest(req); err != nil {
		return nil, err
	}

	if req.GetTimeout() > 0 {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, time.Duration(req.GetTimeout())*time.Second)
		defer cancel()
	}

	tagsChanged, err := s.fetchRemoteAtomic(ctx, req)
	if err != nil {
		return nil, err
	}

	return &gitalypb.FetchRemoteResponse{TagsChanged: tagsChanged}, nil
}

// fetchRemoteAtomic fetches changes from the specified remote repository. To be atomic, fetched
// objects are first quarantined and only migrated before committing the reference transaction.
func (s *server) fetchRemoteAtomic(ctx context.Context, req *gitalypb.FetchRemoteRequest) (_ bool, returnedErr error) {
	var stdout, stderr bytes.Buffer
	opts := localrepo.FetchOpts{
		Stdout:  &stdout,
		Stderr:  &stderr,
		Force:   req.Force,
		Prune:   !req.NoPrune,
		Tags:    localrepo.FetchOptsTagsAll,
		Verbose: true,
		// Transactions are disabled during fetch operation because no references are updated when
		// the dry-run option is enabled. Instead, the reference-transaction hook is performed
		// during the subsequent execution of `git-update-ref(1)`.
		DisableTransactions: true,
		// When the `dry-run` option is used with `git-fetch(1)`, Git objects are received without
		// performing reference updates. This is used to quarantine objects on the initial fetch and
		// migration to occur only during reference update.
		DryRun: true,
		// The `porcelain` option outputs reference update information from `git-fetch(1) to stdout.
		// Since references are not updated during a `git-fetch(1)` dry-run, the reference
		// information is used during `git-update-ref(1)` execution to update the appropriate
		// corresponding references.
		Porcelain: true,
	}

	if req.GetNoTags() {
		opts.Tags = localrepo.FetchOptsTagsNone
	}

	if err := buildCommandOpts(&opts, req); err != nil {
		return false, err
	}

	sshCommand, cleanup, err := git.BuildSSHInvocation(ctx, s.logger, req.GetSshKey(), req.GetKnownHosts())
	if err != nil {
		return false, err
	}
	defer cleanup()

	opts.Env = append(opts.Env, "GIT_SSH_COMMAND="+sshCommand)

	// When performing fetch, objects are received before references are updated. If references fail
	// to be updated, unreachable objects could be left in the repository that would need to be
	// garbage collected. To be more atomic, a quarantine directory is set up where objects will be
	// fetched prior to being migrated to the main repository when reference updates are committed.
	quarantineDir, err := quarantine.New(ctx, req.GetRepository(), s.logger, s.locator)
	if err != nil {
		return false, fmt.Errorf("creating quarantine directory: %w", err)
	}

	quarantineRepo := s.localrepo(quarantineDir.QuarantinedRepo())
	if err := quarantineRepo.FetchRemote(ctx, "inmemory", opts); err != nil {
		// When `git-fetch(1)` fails to apply all reference updates successfully, the command
		// returns `exit status 1`. Despite this error, successful reference updates should still be
		// applied during the subsequent `git-update-ref(1)`. To differentiate between regular
		// errors and failed reference updates, stderr is checked for an error message. If an error
		// message is present, it is determined that an error occurred and the operation halts.
		errMsg := stderr.String()
		if errMsg != "" {
			return false, structerr.NewInternal("fetch remote: %q: %w", errMsg, err)
		}

		// Some errors during the `git-fetch(1)` operation do not print to stderr. If the error
		// message is not `exit status 1`, it is determined that the error is unrelated to failed
		// reference updates and the operation halts. Otherwise, it is assumed the error is from a
		// failed reference update and the operation proceeds to update references.
		if err.Error() != "exit status 1" {
			return false, structerr.NewInternal("fetch remote: %w", err)
		}
	}

	// A repository cannot contain references with F/D (file/directory) conflicts (i.e.
	// `refs/heads/foo` and `refs/heads/foo/bar`). If fetching from the remote repository
	// results in an F/D conflict, the reference update fails. In some cases a conflicting
	// reference may exist locally that does not exist on the remote. In this scenario, if
	// outdated references are first pruned locally, the F/D conflict can be avoided. When
	// `git-fetch(1)` is performed with the `--prune` and `--dry-run` flags, the pruned
	// references are also included in the output without performing any actual reference
	// updates. Bulk atomic reference updates performed by `git-update-ref(1)` do not support
	// F/D conflicts even if the conflicted reference is being pruned. Therefore, pruned
	// references must be updated first in a separate transaction. To accommodate this, two
	// different instances of `updateref.Updater` are used to keep the transactions separate.
	prunedUpdater, err := updateref.New(ctx, quarantineRepo)
	if err != nil {
		return false, fmt.Errorf("spawning pruned updater: %w", err)
	}
	defer func() {
		if err := prunedUpdater.Close(); err != nil && returnedErr == nil {
			returnedErr = fmt.Errorf("cancel pruned updater: %w", err)
		}
	}()

	// All other reference updates can be queued as part of the same transaction.
	refUpdater, err := updateref.New(ctx, quarantineRepo)
	if err != nil {
		return false, fmt.Errorf("spawning ref updater: %w", err)
	}
	defer func() {
		if err := refUpdater.Close(); err != nil && returnedErr == nil {
			returnedErr = fmt.Errorf("cancel ref updater: %w", err)
		}
	}()

	if err := prunedUpdater.Start(); err != nil {
		return false, fmt.Errorf("start reference transaction: %w", err)
	}

	if err := refUpdater.Start(); err != nil {
		return false, fmt.Errorf("start reference transaction: %w", err)
	}

	objectHash, err := quarantineRepo.ObjectHash(ctx)
	if err != nil {
		return false, fmt.Errorf("detecting object hash: %w", err)
	}

	var tagsChanged bool

	// Parse stdout to identify required reference updates. Reference updates are queued to the
	// respective updater based on type.
	scanner := git.NewFetchPorcelainScanner(&stdout, objectHash)
	for scanner.Scan() {
		status := scanner.StatusLine()

		switch status.Type {
		// Failed and unchanged reference updates do not need to be applied.
		case git.RefUpdateTypeUpdateFailed, git.RefUpdateTypeUnchanged:
		// Queue pruned references in a separate transaction to avoid F/D conflicts.
		case git.RefUpdateTypePruned:
			if err := prunedUpdater.Delete(git.ReferenceName(status.Reference)); err != nil {
				return false, fmt.Errorf("queueing pruned ref for deletion: %w", err)
			}
		// Queue all other reference updates in the same transaction.
		default:
			if err := refUpdater.Update(git.ReferenceName(status.Reference), status.NewOID, status.OldOID); err != nil {
				return false, fmt.Errorf("queueing ref to be updated: %w", err)
			}

			// While scanning reference updates, check if any tags changed.
			if status.Type == git.RefUpdateTypeTagUpdate || (status.Type == git.RefUpdateTypeFetched && strings.HasPrefix(status.Reference, "refs/tags")) {
				tagsChanged = true
			}
		}
	}
	if scanner.Err() != nil {
		return false, fmt.Errorf("scanning fetch output: %w", scanner.Err())
	}

	// Prepare pruned references in separate transaction to avoid F/D conflicts.
	if err := prunedUpdater.Prepare(); err != nil {
		return false, fmt.Errorf("preparing reference prune: %w", err)
	}

	// Commit pruned references to complete transaction and apply changes.
	if err := prunedUpdater.Commit(); err != nil {
		return false, fmt.Errorf("committing reference prune: %w", err)
	}

	// Prepare the remaining queued reference updates.
	if err := refUpdater.Prepare(); err != nil {
		return false, fmt.Errorf("preparing reference update: %w", err)
	}

	// Before committing the remaining reference updates, fetched objects must be migrated out of
	// the quarantine directory.
	if err := quarantineDir.Migrate(); err != nil {
		return false, fmt.Errorf("migrating quarantined objects: %w", err)
	}

	// Commit the remaining queued reference updates so the changes get applied.
	if err := refUpdater.Commit(); err != nil {
		return false, fmt.Errorf("committing reference update: %w", err)
	}

	if req.GetCheckTagsChanged() {
		return tagsChanged, nil
	}

	// If the request does not specify to check if tags changed, return true as the default value.
	return true, nil
}

func buildCommandOpts(opts *localrepo.FetchOpts, req *gitalypb.FetchRemoteRequest) error {
	remoteURL := req.GetRemoteParams().GetUrl()
	var config []git.ConfigPair

	for _, refspec := range getRefspecs(req.GetRemoteParams().GetMirrorRefmaps()) {
		config = append(config, git.ConfigPair{
			Key: "remote.inmemory.fetch", Value: refspec,
		})
	}

	if resolvedAddress := req.GetRemoteParams().GetResolvedAddress(); resolvedAddress != "" {
		modifiedURL, resolveConfig, err := git.GetURLAndResolveConfig(remoteURL, resolvedAddress)
		if err != nil {
			return fmt.Errorf("couldn't get curloptResolve config: %w", err)
		}

		remoteURL = modifiedURL
		config = append(config, resolveConfig...)
	}

	config = append(config, git.ConfigPair{Key: "remote.inmemory.url", Value: remoteURL})

	if authHeader := req.GetRemoteParams().GetHttpAuthorizationHeader(); authHeader != "" {
		config = append(config, git.ConfigPair{
			Key:   fmt.Sprintf("http.%s.extraHeader", req.GetRemoteParams().GetUrl()),
			Value: "Authorization: " + authHeader,
		})
	}

	opts.CommandOptions = append(opts.CommandOptions, git.WithConfigEnv(config...))

	return nil
}

func (s *server) validateFetchRemoteRequest(req *gitalypb.FetchRemoteRequest) error {
	if err := s.locator.ValidateRepository(req.GetRepository()); err != nil {
		return structerr.NewInvalidArgument("%w", err)
	}

	if req.GetRemoteParams() == nil {
		return structerr.NewInvalidArgument("missing remote params")
	}

	if req.GetRemoteParams().GetUrl() == "" {
		return structerr.NewInvalidArgument("blank or empty remote URL")
	}

	return nil
}

func getRefspecs(refmaps []string) []string {
	if len(refmaps) == 0 {
		return []string{"refs/*:refs/*"}
	}

	refspecs := make([]string, 0, len(refmaps))

	for _, refmap := range refmaps {
		switch refmap {
		case "all_refs":
			// with `all_refs`, the repository is equivalent to the result of `git clone --mirror`
			refspecs = append(refspecs, "refs/*:refs/*")
		case "heads":
			refspecs = append(refspecs, "refs/heads/*:refs/heads/*")
		case "tags":
			refspecs = append(refspecs, "refs/tags/*:refs/tags/*")
		default:
			refspecs = append(refspecs, refmap)
		}
	}
	return refspecs
}