1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
package rubyserver
import (
"context"
"io"
"os"
"strings"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc/metadata"
)
// Headers prefixed with this string get allowlisted automatically
const rubyFeaturePrefix = "gitaly-feature-ruby-"
const (
storagePathHeader = "gitaly-storage-path"
repoPathHeader = "gitaly-repo-path"
glRepositoryHeader = "gitaly-gl-repository"
repoAltDirsHeader = "gitaly-repo-alt-dirs"
)
// SetHeaders adds headers that tell gitaly-ruby the full path to the repository.
func SetHeaders(ctx context.Context, locator storage.Locator, repo *gitalypb.Repository) (context.Context, error) {
return setHeaders(ctx, locator, repo, true)
}
func setHeaders(ctx context.Context, locator storage.Locator, repo *gitalypb.Repository, mustExist bool) (context.Context, error) {
storagePath, err := locator.GetStorageByName(repo.GetStorageName())
if err != nil {
return nil, err
}
var repoPath string
if mustExist {
repoPath, err = locator.GetRepoPath(repo)
} else {
repoPath, err = locator.GetPath(repo)
}
if err != nil {
return nil, err
}
repoAltDirs := repo.GetGitAlternateObjectDirectories()
repoAltDirs = append(repoAltDirs, repo.GetGitObjectDirectory())
repoAltDirsCombined := strings.Join(repoAltDirs, string(os.PathListSeparator))
md := metadata.Pairs(
storagePathHeader, storagePath,
repoPathHeader, repoPath,
glRepositoryHeader, repo.GlRepository,
repoAltDirsHeader, repoAltDirsCombined,
)
// list of http/2 headers that will be forwarded as-is to gitaly-ruby
proxyHeaderAllowlist := []string{
"gitaly-servers",
txinfo.TransactionMetadataKey,
}
if inMD, ok := metadata.FromIncomingContext(ctx); ok {
// Automatically allowlist any Ruby-specific feature flag
for header := range inMD {
if strings.HasPrefix(header, rubyFeaturePrefix) {
proxyHeaderAllowlist = append(proxyHeaderAllowlist, header)
}
}
for _, header := range proxyHeaderAllowlist {
for _, v := range inMD[header] {
md = metadata.Join(md, metadata.Pairs(header, v))
}
}
}
newCtx := metadata.NewOutgoingContext(ctx, md)
return newCtx, nil
}
// Proxy calls recvSend until it receives an error. The error is returned
// to the caller unless it is io.EOF.
func Proxy(recvSend func() error) (err error) {
for err == nil {
err = recvSend()
}
if err == io.EOF {
err = nil
}
return err
}
// CloseSender captures the CloseSend method from gRPC streams.
type CloseSender interface {
CloseSend() error
}
|