Welcome to mirror list, hosted at ThFree Co, Russian Federation.

update_remote_mirror.go « remote « service « gitaly « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 8c41918a37a6491603d9e1e18999c2acee76b552 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
package remote

import (
	"context"
	"errors"
	"fmt"
	"io"
	"regexp"
	"strings"

	"gitlab.com/gitlab-org/gitaly/v15/internal/git"
	"gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
	"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
	"gitlab.com/gitlab-org/gitaly/v15/internal/helper/text"
	"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
)

const (
	// pushBatchSize is the maximum number of branches to push in a single push call.
	pushBatchSize = 10
	// maxDivergentRefs is the maximum number of divergent refs to return in UpdateRemoteMirror's
	// response.
	maxDivergentRefs = 100
)

func (s *server) UpdateRemoteMirror(stream gitalypb.RemoteService_UpdateRemoteMirrorServer) error {
	firstRequest, err := stream.Recv()
	if err != nil {
		return helper.ErrInternalf("receive first request: %v", err)
	}

	if err = validateUpdateRemoteMirrorRequest(stream.Context(), firstRequest); err != nil {
		return helper.ErrInvalidArgument(err)
	}

	if err := s.updateRemoteMirror(stream, firstRequest); err != nil {
		return helper.ErrInternal(err)
	}

	return nil
}

func (s *server) updateRemoteMirror(stream gitalypb.RemoteService_UpdateRemoteMirrorServer, firstRequest *gitalypb.UpdateRemoteMirrorRequest) error {
	ctx := stream.Context()

	branchMatchers := firstRequest.GetOnlyBranchesMatching()
	for {
		req, err := stream.Recv()
		if err != nil {
			if errors.Is(err, io.EOF) {
				break
			}

			return fmt.Errorf("receive: %w", err)
		}

		branchMatchers = append(branchMatchers, req.GetOnlyBranchesMatching()...)
	}

	referenceMatcher, err := newReferenceMatcher(branchMatchers)
	if err != nil {
		return fmt.Errorf("create reference matcher: %w", err)
	}

	repo := s.localrepo(firstRequest.GetRepository())
	remote := firstRequest.GetRemote()

	remoteSuffix, err := text.RandomHex(8)
	if err != nil {
		return fmt.Errorf("generating remote suffix: %w", err)
	}
	remoteName := "inmemory-" + remoteSuffix

	remoteConfig := []git.ConfigPair{
		{Key: fmt.Sprintf("remote.%s.url", remoteName), Value: remote.GetUrl()},
	}

	if authHeader := remote.GetHttpAuthorizationHeader(); authHeader != "" {
		remoteConfig = append(remoteConfig, git.ConfigPair{
			Key:   fmt.Sprintf("http.%s.extraHeader", remote.GetUrl()),
			Value: "Authorization: " + authHeader,
		})
	}
	if host := remote.GetHttpHost(); host != "" {
		configPair, err := git.GetCurloptResolveConfig(remote.GetUrl(), host)
		// ignore the error since this only works for http/https
		if err == nil {
			remoteConfig = append(remoteConfig, configPair)
		}
	}

	sshCommand, clean, err := git.BuildSSHInvocation(ctx, firstRequest.GetSshKey(), firstRequest.GetKnownHosts())
	if err != nil {
		return fmt.Errorf("build ssh invocation: %w", err)
	}
	defer clean()

	remoteRefsSlice, err := repo.GetRemoteReferences(ctx, remoteName,
		localrepo.WithPatterns("refs/heads/*", "refs/tags/*"),
		localrepo.WithConfig(remoteConfig...),
		localrepo.WithSSHCommand(sshCommand),
	)
	if err != nil {
		return fmt.Errorf("get remote references: %w", err)
	}

	localRefs, err := repo.GetReferences(ctx, "refs/heads/", "refs/tags/")
	if err != nil {
		return fmt.Errorf("get local references: %w", err)
	}

	defaultBranch, err := repo.GetDefaultBranch(ctx)
	if err != nil {
		return fmt.Errorf("get default branch: %w", err)
	}

	remoteRefs := make(map[git.ReferenceName]string, len(remoteRefsSlice))
	for _, ref := range remoteRefsSlice {
		if ref.IsSymbolic {
			// There should be no symbolic refs in refs/heads/ or refs/tags, so we'll just ignore
			// them if something has placed one there.
			continue
		}

		remoteRefs[ref.Name] = ref.Target
	}

	var divergentRefs [][]byte
	toUpdate := map[git.ReferenceName]string{}
	for _, localRef := range localRefs {
		if localRef.IsSymbolic {
			continue
		}

		remoteTarget, ok := remoteRefs[localRef.Name]
		if !ok {
			// ref does not exist on the mirror, it should be created
			toUpdate[localRef.Name] = localRef.Target
			delete(remoteRefs, localRef.Name)
			continue
		}

		if remoteTarget == localRef.Target {
			// ref is up to date on the mirror
			delete(remoteRefs, localRef.Name)
			continue
		}

		if firstRequest.GetKeepDivergentRefs() {
			isAncestor, err := repo.IsAncestor(ctx, git.Revision(remoteTarget), git.Revision(localRef.Target))
			if err != nil && !errors.Is(err, localrepo.InvalidCommitError(remoteTarget)) {
				return fmt.Errorf("is ancestor: %w", err)
			}

			if !isAncestor {
				// The mirror's reference has diverged from the local ref, or the mirror contains a commit
				// which is not present in the local repository.
				if referenceMatcher.MatchString(localRef.Name.String()) && len(divergentRefs) < maxDivergentRefs {
					// diverged branches on the mirror are only included in the response if they match
					// one of the branches in the selector
					divergentRefs = append(divergentRefs, []byte(localRef.Name))
				}

				delete(remoteRefs, localRef.Name)
				continue
			}
		}

		// the mirror's ref does not match ours, we should update it.
		toUpdate[localRef.Name] = localRef.Target
		delete(remoteRefs, localRef.Name)
	}

	toDelete := remoteRefs
	if len(defaultBranch) == 0 || firstRequest.GetKeepDivergentRefs() {
		toDelete = map[git.ReferenceName]string{}
	}

	for remoteRef, remoteCommitOID := range toDelete {
		isAncestor, err := repo.IsAncestor(ctx, git.Revision(remoteCommitOID), git.Revision(defaultBranch))
		if err != nil && !errors.Is(err, localrepo.InvalidCommitError(remoteCommitOID)) {
			return fmt.Errorf("is ancestor: %w", err)
		}

		if isAncestor {
			continue
		}

		// The commit in the extra branch in the remote repository has not been merged in to the
		// local repository's default branch. Keep it to avoid losing work.
		delete(toDelete, remoteRef)
	}

	var refspecs []string
	for prefix, references := range map[string]map[git.ReferenceName]string{
		"": toUpdate, ":": toDelete,
	} {
		for reference := range references {
			if !referenceMatcher.MatchString(reference.String()) {
				continue
			}

			refspecs = append(refspecs, prefix+reference.String())
			if reference == defaultBranch {
				// The default branch needs to be pushed in the first batch of refspecs as some features
				// depend on it existing in the repository. The default branch may not exist in the repo
				// yet if this is the first mirroring push.
				last := len(refspecs) - 1
				refspecs[0], refspecs[last] = refspecs[last], refspecs[0]
			}
		}
	}

	for len(refspecs) > 0 {
		batch := refspecs
		if len(refspecs) > pushBatchSize {
			batch = refspecs[:pushBatchSize]
		}

		refspecs = refspecs[len(batch):]

		if err := repo.Push(ctx, remoteName, batch, localrepo.PushOptions{
			SSHCommand: sshCommand,
			Force:      !firstRequest.KeepDivergentRefs,
			Config:     remoteConfig,
		}); err != nil {
			return fmt.Errorf("push to mirror: %w", err)
		}
	}

	return stream.SendAndClose(&gitalypb.UpdateRemoteMirrorResponse{DivergentRefs: divergentRefs})
}

// newReferenceMatcher returns a regexp which matches references that should
// be updated in the mirror repository. Tags are always matched successfully.
// branchMatchers optionally contain patterns that are used to match branches.
// The patterns should only include the branch name without the `refs/heads/`
// prefix. "*" can be used as a wildcard in the patterns. If no branchMatchers
// are specified, all branches are matched successfully.
func newReferenceMatcher(branchMatchers [][]byte) (*regexp.Regexp, error) {
	sb := &strings.Builder{}
	sb.WriteString("^refs/tags/.+$|^refs/heads/(")

	for i, expression := range branchMatchers {
		segments := strings.Split(string(expression), "*")
		for i := range segments {
			segments[i] = regexp.QuoteMeta(segments[i])
		}

		sb.WriteString(strings.Join(segments, ".*"))

		if i < len(branchMatchers)-1 {
			sb.WriteString("|")
		}
	}

	if len(branchMatchers) == 0 {
		sb.WriteString(".+")
	}

	sb.WriteString(")$")

	return regexp.Compile(sb.String())
}

func validateUpdateRemoteMirrorRequest(ctx context.Context, req *gitalypb.UpdateRemoteMirrorRequest) error {
	if req.GetRepository() == nil {
		return fmt.Errorf("empty Repository")
	}

	if req.GetRemote() == nil {
		return fmt.Errorf("missing Remote")
	}

	if req.GetRemote().GetUrl() == "" {
		return fmt.Errorf("remote is missing URL")
	}

	return nil
}