Welcome to mirror list, hosted at ThFree Co, Russian Federation.

update_remote_mirror.go « remote « service « gitaly « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 86e1a142af2bbe6d24499fe5e4a23833da8ed423 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
package remote

import (
	"errors"
	"fmt"
	"io"
	"regexp"
	"strings"

	"gitlab.com/gitlab-org/gitaly/v14/internal/git"
	"gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
	"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/rubyserver"
	"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/ref"
	"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
	"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
	"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)

const (
	// pushBatchSize is the maximum number of branches to push in a single push call.
	pushBatchSize = 10
	// maxDivergentRefs is the maximum number of divergent refs to return in UpdateRemoteMirror's
	// response.
	maxDivergentRefs = 100
)

func (s *server) UpdateRemoteMirror(stream gitalypb.RemoteService_UpdateRemoteMirrorServer) error {
	firstRequest, err := stream.Recv()
	if err != nil {
		return helper.ErrInternalf("receive first request: %v", err)
	}

	if err = validateUpdateRemoteMirrorRequest(firstRequest); err != nil {
		return helper.ErrInvalidArgument(err)
	}

	if featureflag.IsEnabled(stream.Context(), featureflag.GoUpdateRemoteMirror) {
		if err := s.goUpdateRemoteMirror(stream, firstRequest); err != nil {
			return helper.ErrInternal(err)
		}

		return nil
	}

	if err := s.updateRemoteMirror(stream, firstRequest); err != nil {
		return helper.ErrInternal(err)
	}

	return nil
}

// updateRemoteMirror has lots of decorated errors to help us debug
// https://gitlab.com/gitlab-org/gitaly/issues/2156.
func (s *server) updateRemoteMirror(stream gitalypb.RemoteService_UpdateRemoteMirrorServer, firstRequest *gitalypb.UpdateRemoteMirrorRequest) error {
	ctx := stream.Context()
	client, err := s.ruby.RemoteServiceClient(ctx)
	if err != nil {
		return fmt.Errorf("get stub: %v", err)
	}

	clientCtx, err := rubyserver.SetHeaders(ctx, s.locator, firstRequest.GetRepository())
	if err != nil {
		return fmt.Errorf("set headers: %v", err)
	}

	rubyStream, err := client.UpdateRemoteMirror(clientCtx)
	if err != nil {
		return fmt.Errorf("create client: %v", err)
	}

	if err := rubyStream.Send(firstRequest); err != nil {
		return fmt.Errorf("first request to gitaly-ruby: %v", err)
	}

	err = rubyserver.Proxy(func() error {
		// Do not wrap errors in this callback: we must faithfully relay io.EOF
		request, err := stream.Recv()
		if err != nil {
			return err
		}

		return rubyStream.Send(request)
	})
	if err != nil {
		return fmt.Errorf("proxy request to gitaly-ruby: %v", err)
	}

	response, err := rubyStream.CloseAndRecv()
	if err != nil {
		return fmt.Errorf("close stream to gitaly-ruby: %v", err)
	}

	if err := stream.SendAndClose(response); err != nil {
		return fmt.Errorf("close stream to client: %v", err)
	}

	return nil
}

func (s *server) goUpdateRemoteMirror(stream gitalypb.RemoteService_UpdateRemoteMirrorServer, firstRequest *gitalypb.UpdateRemoteMirrorRequest) error {
	ctx := stream.Context()

	branchMatchers := firstRequest.GetOnlyBranchesMatching()
	for {
		req, err := stream.Recv()
		if err != nil {
			if errors.Is(err, io.EOF) {
				break
			}

			return fmt.Errorf("receive: %w", err)
		}

		branchMatchers = append(branchMatchers, req.GetOnlyBranchesMatching()...)
	}

	referenceMatcher, err := newReferenceMatcher(branchMatchers)
	if err != nil {
		return fmt.Errorf("create reference matcher: %w", err)
	}

	repo := s.localrepo(firstRequest.GetRepository())
	remoteRefsSlice, err := repo.GetRemoteReferences(ctx, firstRequest.GetRefName(), "refs/heads/*", "refs/tags/*")
	if err != nil {
		return fmt.Errorf("get remote references: %w", err)
	}

	localRefs, err := repo.GetReferences(ctx, "refs/heads/", "refs/tags/")
	if err != nil {
		return fmt.Errorf("get local references: %w", err)
	}

	if len(localRefs) == 0 {
		// https://gitlab.com/gitlab-org/gitaly/-/issues/3503
		return errors.New("close stream to gitaly-ruby: rpc error: code = Unknown desc = NoMethodError: undefined method `id' for nil:NilClass")
	}

	defaultBranch, err := ref.DefaultBranchName(ctx, repo)
	if err != nil {
		return fmt.Errorf("get default branch: %w", err)
	}

	remoteRefs := make(map[git.ReferenceName]string, len(remoteRefsSlice))
	for _, ref := range remoteRefsSlice {
		remoteRefs[ref.Name] = ref.Target
	}

	var divergentRefs [][]byte
	toUpdate := map[git.ReferenceName]string{}
	for _, localRef := range localRefs {
		remoteTarget, ok := remoteRefs[localRef.Name]
		if !ok {
			// ref does not exist on the mirror, it should be created
			toUpdate[localRef.Name] = localRef.Target
			delete(remoteRefs, localRef.Name)
			continue
		}

		if remoteTarget == localRef.Target {
			// ref is up to date on the mirror
			delete(remoteRefs, localRef.Name)
			continue
		}

		if firstRequest.GetKeepDivergentRefs() {
			isAncestor, err := repo.IsAncestor(ctx, git.Revision(remoteTarget), git.Revision(localRef.Target))
			if err != nil && !errors.Is(err, localrepo.InvalidCommitError(remoteTarget)) {
				return fmt.Errorf("is ancestor: %w", err)
			}

			if !isAncestor {
				// The mirror's reference has diverged from the local ref, or the mirror contains a commit
				// which is not present in the local repository.
				if referenceMatcher.MatchString(localRef.Name.String()) && len(divergentRefs) < maxDivergentRefs {
					// diverged branches on the mirror are only included in the response if they match
					// one of the branches in the selector
					divergentRefs = append(divergentRefs, []byte(localRef.Name))
				}

				delete(remoteRefs, localRef.Name)
				continue
			}
		}

		if localRef.Name == "refs/heads/tag" {
			// https://gitlab.com/gitlab-org/gitaly/-/issues/3502
			return errors.New("close stream to gitaly-ruby: rpc error: code = Unknown desc = Gitlab::Git::CommandError: fatal: tag shorthand without <tag>")
		}

		// the mirror's ref does not match ours, we should update it.
		toUpdate[localRef.Name] = localRef.Target
		delete(remoteRefs, localRef.Name)
	}

	toDelete := remoteRefs
	if firstRequest.GetKeepDivergentRefs() {
		toDelete = map[git.ReferenceName]string{}
	}

	seen := map[string]struct{}{}
	var refspecs []string
	for prefix, references := range map[string]map[git.ReferenceName]string{
		"": toUpdate, ":": toDelete,
	} {
		for reference := range references {
			if !referenceMatcher.MatchString(reference.String()) {
				continue
			}

			refspecs = append(refspecs, prefix+reference.String())
			if reference == git.ReferenceName(defaultBranch) {
				// The default branch needs to be pushed in the first batch of refspecs as some features
				// depend on it existing in the repository. The default branch may not exist in the repo
				// yet if this is the first mirroring push.
				last := len(refspecs) - 1
				refspecs[0], refspecs[last] = refspecs[last], refspecs[0]
			}

			// https://gitlab.com/gitlab-org/gitaly/-/issues/3504
			name := strings.TrimPrefix(reference.String(), "refs/heads/")
			if strings.HasPrefix(reference.String(), "refs/tags/") {
				name = strings.TrimPrefix(reference.String(), "refs/tags/")
			}

			if _, ok := seen[name]; ok {
				return errors.New("close stream to gitaly-ruby: rpc error: code = Unknown desc = Gitlab::Git::CommandError: error: src refspec master matches more than one")
			}

			seen[name] = struct{}{}
		}
	}

	if len(refspecs) > 0 {
		sshCommand, clean, err := git.BuildSSHInvocation(ctx, firstRequest.GetSshKey(), firstRequest.GetKnownHosts())
		if err != nil {
			return fmt.Errorf("build ssh invocation: %w", err)
		}
		defer clean()

		for len(refspecs) > 0 {
			batch := refspecs
			if len(refspecs) > pushBatchSize {
				batch = refspecs[:pushBatchSize]
			}

			refspecs = refspecs[len(batch):]

			// The refs could have been modified on the mirror during after we fetched them.
			// This could cause divergent refs to be force pushed over even with keep_divergent_refs set.
			// This could be addressed by force pushing only if the current ref still matches what
			// we received in the original fetch. https://gitlab.com/gitlab-org/gitaly/-/issues/3505
			if err := repo.Push(ctx, firstRequest.GetRefName(), batch, localrepo.PushOptions{SSHCommand: sshCommand}); err != nil {
				return fmt.Errorf("push to mirror: %w", err)
			}
		}
	}

	return stream.SendAndClose(&gitalypb.UpdateRemoteMirrorResponse{DivergentRefs: divergentRefs})
}

// newReferenceMatcher returns a regexp which matches references that should
// be updated in the mirror repository. Tags are always matched successfully.
// branchMatchers optionally contain patterns that are used to match branches.
// The patterns should only include the branch name without the `refs/heads/`
// prefix. "*" can be used as a wilcard in the patterns. If no branchMatchers
// are specified, all branches are matched successfully.
func newReferenceMatcher(branchMatchers [][]byte) (*regexp.Regexp, error) {
	sb := &strings.Builder{}
	sb.WriteString("^refs/tags/.+$|^refs/heads/(")

	for i, expression := range branchMatchers {
		segments := strings.Split(string(expression), "*")
		for i := range segments {
			segments[i] = regexp.QuoteMeta(segments[i])
		}

		sb.WriteString(strings.Join(segments, ".*"))

		if i < len(branchMatchers)-1 {
			sb.WriteString("|")
		}
	}

	if len(branchMatchers) == 0 {
		sb.WriteString(".+")
	}

	sb.WriteString(")$")

	return regexp.Compile(sb.String())
}

func validateUpdateRemoteMirrorRequest(req *gitalypb.UpdateRemoteMirrorRequest) error {
	if req.GetRepository() == nil {
		return fmt.Errorf("empty Repository")
	}
	if req.GetRefName() == "" {
		return fmt.Errorf("empty RefName")
	}

	return nil
}