Welcome to mirror list, hosted at ThFree Co, Russian Federation.

proxy.go « rubyserver « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 29aa9781478082b3a9152438afdf4315d141dce4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package rubyserver

import (
	"context"
	"io"
	"os"
	"strings"

	pb "gitlab.com/gitlab-org/gitaly-proto/go"
	"gitlab.com/gitlab-org/gitaly/internal/helper"

	"google.golang.org/grpc/metadata"
)

// ProxyHeaderWhitelist is the list of http/2 headers that will be
// forwarded as-is to gitaly-ruby.
var ProxyHeaderWhitelist = []string{"gitaly-servers"}

const (
	storagePathHeader  = "gitaly-storage-path"
	repoPathHeader     = "gitaly-repo-path"
	glRepositoryHeader = "gitaly-gl-repository"
	repoAltDirsHeader  = "gitaly-repo-alt-dirs"
)

// SetHeadersWithoutRepoCheck adds headers that tell gitaly-ruby the full
// path to the repository. It is not an error if the repository does not
// yet exist. This can be used on RPC calls that will create a
// repository.
func SetHeadersWithoutRepoCheck(ctx context.Context, repo *pb.Repository) (context.Context, error) {
	return setHeaders(ctx, repo, false)
}

// SetHeaders adds headers that tell gitaly-ruby the full path to the repository.
func SetHeaders(ctx context.Context, repo *pb.Repository) (context.Context, error) {
	return setHeaders(ctx, repo, true)
}

func setHeaders(ctx context.Context, repo *pb.Repository, mustExist bool) (context.Context, error) {
	storagePath, err := helper.GetStorageByName(repo.GetStorageName())
	if err != nil {
		return nil, err
	}

	var repoPath string
	if mustExist {
		repoPath, err = helper.GetRepoPath(repo)
	} else {
		repoPath, err = helper.GetPath(repo)
	}
	if err != nil {
		return nil, err
	}

	repoAltDirs := repo.GetGitAlternateObjectDirectories()
	repoAltDirs = append(repoAltDirs, repo.GetGitObjectDirectory())
	repoAltDirsCombined := strings.Join(repoAltDirs, string(os.PathListSeparator))

	md := metadata.Pairs(
		storagePathHeader, storagePath,
		repoPathHeader, repoPath,
		glRepositoryHeader, repo.GlRepository,
		repoAltDirsHeader, repoAltDirsCombined,
	)

	if inMD, ok := metadata.FromIncomingContext(ctx); ok {
		for _, header := range ProxyHeaderWhitelist {
			for _, v := range inMD[header] {
				md = metadata.Join(md, metadata.Pairs(header, v))
			}
		}
	}

	newCtx := metadata.NewOutgoingContext(ctx, md)
	return newCtx, nil
}

// Proxy calls recvSend until it receives an error. The error is returned
// to the caller unless it is io.EOF.
func Proxy(recvSend func() error) (err error) {
	for err == nil {
		err = recvSend()
	}

	if err == io.EOF {
		err = nil
	}
	return err
}

// CloseSender captures the CloseSend method from gRPC streams.
type CloseSender interface {
	CloseSend() error
}

// ProxyBidi works like Proxy but runs multiple callbacks simultaneously.
// It returns immediately if proxying one of the callbacks fails. If the
// response stream is done, ProxyBidi returns immediately without waiting
// for the client stream to finish proxying.
func ProxyBidi(requestFunc func() error, requestStream CloseSender, responseFunc func() error) error {
	requestErr := make(chan error, 1)
	go func() {
		requestErr <- Proxy(requestFunc)
	}()

	responseErr := make(chan error, 1)
	go func() {
		responseErr <- Proxy(responseFunc)
	}()

	for {
		select {
		case err := <-requestErr:
			if err != nil {
				return err
			}
			requestStream.CloseSend()
		case err := <-responseErr:
			return err
		}
	}
}