1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
package commit
import (
"context"
"io"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper/chunk"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/protobuf/proto"
)
func (s *server) CheckObjectsExist(
stream gitalypb.CommitService_CheckObjectsExistServer,
) error {
ctx := stream.Context()
request, err := stream.Recv()
if err != nil {
if err == io.EOF {
// Ideally, we'd return an invalid-argument error in case there aren't any
// requests. We can't do this though as this would diverge from Praefect's
// behaviour, which always returns `io.EOF`.
return err
}
return helper.ErrInternalf("receiving initial request: %w", err)
}
if request.GetRepository() == nil {
return helper.ErrInvalidArgumentf("empty Repository")
}
objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(
ctx,
s.localrepo(request.GetRepository()),
)
if err != nil {
return helper.ErrInternalf("creating object info reader: %w", err)
}
defer cancel()
chunker := chunk.New(&checkObjectsExistSender{stream: stream})
for {
// Note: we have already fetched the first request containing revisions further up,
// so we only fetch the next request at the end of this loop.
for _, revision := range request.GetRevisions() {
if err := git.ValidateRevision(revision); err != nil {
return helper.ErrInvalidArgumentf("invalid revision %q: %w", revision, err)
}
}
if err := checkObjectsExist(ctx, request, objectInfoReader, chunker); err != nil {
return helper.ErrInternalf("checking object existence: %w", err)
}
request, err = stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return helper.ErrInternalf("receiving request: %w", err)
}
}
if err := chunker.Flush(); err != nil {
return helper.ErrInternalf("flushing results: %w", err)
}
return nil
}
type checkObjectsExistSender struct {
stream gitalypb.CommitService_CheckObjectsExistServer
revisions []*gitalypb.CheckObjectsExistResponse_RevisionExistence
}
func (c *checkObjectsExistSender) Send() error {
return c.stream.Send(&gitalypb.CheckObjectsExistResponse{
Revisions: c.revisions,
})
}
func (c *checkObjectsExistSender) Reset() {
c.revisions = c.revisions[:0]
}
func (c *checkObjectsExistSender) Append(m proto.Message) {
c.revisions = append(c.revisions, m.(*gitalypb.CheckObjectsExistResponse_RevisionExistence))
}
func checkObjectsExist(
ctx context.Context,
request *gitalypb.CheckObjectsExistRequest,
objectInfoReader catfile.ObjectInfoReader,
chunker *chunk.Chunker,
) error {
revisions := request.GetRevisions()
for _, revision := range revisions {
revisionExistence := gitalypb.CheckObjectsExistResponse_RevisionExistence{
Name: revision,
Exists: true,
}
_, err := objectInfoReader.Info(ctx, git.Revision(revision))
if err != nil {
if catfile.IsNotFound(err) {
revisionExistence.Exists = false
} else {
return helper.ErrInternalf("reading object info: %w", err)
}
}
if err := chunker.Send(&revisionExistence); err != nil {
return helper.ErrInternalf("adding to chunker: %w", err)
}
}
return nil
}
|