Welcome to mirror list, hosted at ThFree Co, Russian Federation.

get_blobs.go « blob « service « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 4bb70ee5fbb7c04a9fe215d87a84212fba46914c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package blob

import (
	"io"
	"io/ioutil"

	"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
	"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
	"gitlab.com/gitlab-org/gitaly/internal/service/commit"
	"gitlab.com/gitlab-org/gitaly/streamio"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

func sendGetBlobsResponse(req *gitalypb.GetBlobsRequest, stream gitalypb.BlobService_GetBlobsServer, c *catfile.Batch) error {
	for _, revisionPath := range req.RevisionPaths {
		revision := revisionPath.Revision
		path := revisionPath.Path

		treeEntry, err := commit.TreeEntryForRevisionAndPath(c, revision, string(path))
		if err != nil {
			return err
		}

		response := &gitalypb.GetBlobsResponse{Revision: revision, Path: path}

		if treeEntry == nil || len(treeEntry.Oid) == 0 {
			if err := stream.Send(response); err != nil {
				return status.Errorf(codes.Unavailable, "GetBlobs: send: %v", err)
			}

			continue
		}

		response.Mode = treeEntry.Mode
		response.Oid = treeEntry.Oid

		if treeEntry.Type == gitalypb.TreeEntry_COMMIT {
			response.IsSubmodule = true

			if err := stream.Send(response); err != nil {
				return status.Errorf(codes.Unavailable, "GetBlobs: send: %v", err)
			}

			continue
		}

		objectInfo, err := c.Info(treeEntry.Oid)
		if err != nil {
			return status.Errorf(codes.Internal, "GetBlobs: %v", err)
		}
		if objectInfo.Type != "blob" {
			return status.Errorf(codes.InvalidArgument, "GetBlobs: object at %s:%s is %s, not blob", revision, path, objectInfo.Type)
		}

		response.Size = objectInfo.Size

		var readLimit int64
		if req.Limit < 0 || req.Limit > objectInfo.Size {
			readLimit = objectInfo.Size
		} else {
			readLimit = req.Limit
		}

		// For correctness it does not matter, but for performance, the order is
		// important: first check if readlimit == 0, if not, only then create
		// blobReader.
		if readLimit == 0 {
			if err := stream.Send(response); err != nil {
				return status.Errorf(codes.Unavailable, "GetBlobs: send: %v", err)
			}
			continue
		}

		blobReader, err := c.Blob(objectInfo.Oid)
		if err != nil {
			return status.Errorf(codes.Internal, "GetBlobs: %v", err)
		}

		sw := streamio.NewWriter(func(p []byte) error {
			msg := &gitalypb.GetBlobsResponse{}
			if response != nil {
				msg = response
				response = nil
			}

			msg.Data = p

			return stream.Send(msg)
		})

		_, err = io.CopyN(sw, blobReader, readLimit)
		if err != nil {
			return status.Errorf(codes.Unavailable, "GetBlobs: send: %v", err)
		}

		if _, err := io.Copy(ioutil.Discard, blobReader); err != nil {
			return status.Errorf(codes.Unavailable, "GetBlobs: discarding data: %v", err)
		}
	}

	return nil
}

func (*server) GetBlobs(req *gitalypb.GetBlobsRequest, stream gitalypb.BlobService_GetBlobsServer) error {
	if req.Repository == nil {
		return status.Errorf(codes.InvalidArgument, "GetBlobs: empty Repository")
	}

	if len(req.RevisionPaths) == 0 {
		return status.Errorf(codes.InvalidArgument, "GetBlobs: empty RevisionPaths")
	}
	c, err := catfile.New(stream.Context(), req.Repository)
	if err != nil {
		return err
	}

	return sendGetBlobsResponse(req, stream, c)
}