Welcome to mirror list, hosted at ThFree Co, Russian Federation.

check_objects_exist.go « commit « service « gitaly « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: c7b8c4f496665310802db32113a0797da1a27af9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package commit

import (
	"context"
	"io"

	"gitlab.com/gitlab-org/gitaly/internal/git"
	"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
	"gitlab.com/gitlab-org/gitaly/internal/helper"
	"gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
	"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
	"google.golang.org/protobuf/proto"
)

func (s *server) CheckObjectsExist(
	stream gitalypb.CommitService_CheckObjectsExistServer,
) error {
	ctx := stream.Context()

	request, err := stream.Recv()
	if err != nil {
		if err == io.EOF {
			// Ideally, we'd return an invalid-argument error in case there aren't any
			// requests. We can't do this though as this would diverge from Praefect's
			// behaviour, which always returns `io.EOF`.
			return err
		}
		return helper.ErrInternalf("receiving initial request: %w", err)
	}

	if request.GetRepository() == nil {
		return helper.ErrInvalidArgumentf("empty Repository")
	}

	objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(
		ctx,
		s.localrepo(request.GetRepository()),
	)
	if err != nil {
		return helper.ErrInternalf("creating object info reader: %w", err)
	}
	defer cancel()

	chunker := chunk.New(&checkObjectsExistSender{stream: stream})
	for {
		// Note: we have already fetched the first request containing revisions further up,
		// so we only fetch the next request at the end of this loop.
		for _, revision := range request.GetRevisions() {
			if err := git.ValidateRevision(revision); err != nil {
				return helper.ErrInvalidArgumentf("invalid revision %q: %w", revision, err)
			}
		}

		if err := checkObjectsExist(ctx, request, objectInfoReader, chunker); err != nil {
			return helper.ErrInternalf("checking object existence: %w", err)
		}

		request, err = stream.Recv()
		if err != nil {
			if err == io.EOF {
				break
			}

			return helper.ErrInternalf("receiving request: %w", err)
		}
	}

	if err := chunker.Flush(); err != nil {
		return helper.ErrInternalf("flushing results: %w", err)
	}

	return nil
}

type checkObjectsExistSender struct {
	stream    gitalypb.CommitService_CheckObjectsExistServer
	revisions []*gitalypb.CheckObjectsExistResponse_RevisionExistence
}

func (c *checkObjectsExistSender) Send() error {
	return c.stream.Send(&gitalypb.CheckObjectsExistResponse{
		Revisions: c.revisions,
	})
}

func (c *checkObjectsExistSender) Reset() {
	c.revisions = c.revisions[:0]
}

func (c *checkObjectsExistSender) Append(m proto.Message) {
	c.revisions = append(c.revisions, m.(*gitalypb.CheckObjectsExistResponse_RevisionExistence))
}

func checkObjectsExist(
	ctx context.Context,
	request *gitalypb.CheckObjectsExistRequest,
	objectInfoReader catfile.ObjectInfoReader,
	chunker *chunk.Chunker,
) error {
	revisions := request.GetRevisions()

	for _, revision := range revisions {
		revisionExistence := gitalypb.CheckObjectsExistResponse_RevisionExistence{
			Name:   revision,
			Exists: true,
		}
		_, err := objectInfoReader.Info(ctx, git.Revision(revision))
		if err != nil {
			if catfile.IsNotFound(err) {
				revisionExistence.Exists = false
			} else {
				return helper.ErrInternalf("reading object info: %w", err)
			}
		}

		if err := chunker.Send(&revisionExistence); err != nil {
			return helper.ErrInternalf("adding to chunker: %w", err)
		}
	}

	return nil
}