Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/gitaly')
-rw-r--r--workhorse/internal/gitaly/blob.go41
-rw-r--r--workhorse/internal/gitaly/diff.go55
-rw-r--r--workhorse/internal/gitaly/gitaly.go188
-rw-r--r--workhorse/internal/gitaly/gitaly_test.go80
-rw-r--r--workhorse/internal/gitaly/namespace.go8
-rw-r--r--workhorse/internal/gitaly/repository.go45
-rw-r--r--workhorse/internal/gitaly/smarthttp.go139
-rw-r--r--workhorse/internal/gitaly/unmarshal_test.go35
8 files changed, 591 insertions, 0 deletions
diff --git a/workhorse/internal/gitaly/blob.go b/workhorse/internal/gitaly/blob.go
new file mode 100644
index 00000000000..c6f5d6676f3
--- /dev/null
+++ b/workhorse/internal/gitaly/blob.go
@@ -0,0 +1,41 @@
+package gitaly
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "strconv"
+
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/streamio"
+)
+
+type BlobClient struct {
+ gitalypb.BlobServiceClient
+}
+
+func (client *BlobClient) SendBlob(ctx context.Context, w http.ResponseWriter, request *gitalypb.GetBlobRequest) error {
+ c, err := client.GetBlob(ctx, request)
+ if err != nil {
+ return fmt.Errorf("rpc failed: %v", err)
+ }
+
+ firstResponseReceived := false
+ rr := streamio.NewReader(func() ([]byte, error) {
+ resp, err := c.Recv()
+
+ if !firstResponseReceived && err == nil {
+ firstResponseReceived = true
+ w.Header().Set("Content-Length", strconv.FormatInt(resp.GetSize(), 10))
+ }
+
+ return resp.GetData(), err
+ })
+
+ if _, err := io.Copy(w, rr); err != nil {
+ return fmt.Errorf("copy rpc data: %v", err)
+ }
+
+ return nil
+}
diff --git a/workhorse/internal/gitaly/diff.go b/workhorse/internal/gitaly/diff.go
new file mode 100644
index 00000000000..035a58ec6fd
--- /dev/null
+++ b/workhorse/internal/gitaly/diff.go
@@ -0,0 +1,55 @@
+package gitaly
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/streamio"
+)
+
+type DiffClient struct {
+ gitalypb.DiffServiceClient
+}
+
+func (client *DiffClient) SendRawDiff(ctx context.Context, w http.ResponseWriter, request *gitalypb.RawDiffRequest) error {
+ c, err := client.RawDiff(ctx, request)
+ if err != nil {
+ return fmt.Errorf("rpc failed: %v", err)
+ }
+
+ w.Header().Del("Content-Length")
+
+ rr := streamio.NewReader(func() ([]byte, error) {
+ resp, err := c.Recv()
+ return resp.GetData(), err
+ })
+
+ if _, err := io.Copy(w, rr); err != nil {
+ return fmt.Errorf("copy rpc data: %v", err)
+ }
+
+ return nil
+}
+
+func (client *DiffClient) SendRawPatch(ctx context.Context, w http.ResponseWriter, request *gitalypb.RawPatchRequest) error {
+ c, err := client.RawPatch(ctx, request)
+ if err != nil {
+ return fmt.Errorf("rpc failed: %v", err)
+ }
+
+ w.Header().Del("Content-Length")
+
+ rr := streamio.NewReader(func() ([]byte, error) {
+ resp, err := c.Recv()
+ return resp.GetData(), err
+ })
+
+ if _, err := io.Copy(w, rr); err != nil {
+ return fmt.Errorf("copy rpc data: %v", err)
+ }
+
+ return nil
+}
diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go
new file mode 100644
index 00000000000..c739ac8d9b2
--- /dev/null
+++ b/workhorse/internal/gitaly/gitaly.go
@@ -0,0 +1,188 @@
+package gitaly
+
+import (
+ "context"
+ "strings"
+ "sync"
+
+ "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274
+ "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274
+ grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+ grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ gitalyclient "gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+
+ grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
+ grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
+)
+
+type Server struct {
+ Address string `json:"address"`
+ Token string `json:"token"`
+ Features map[string]string `json:"features"`
+}
+
+type cacheKey struct{ address, token string }
+
+func (server Server) cacheKey() cacheKey {
+ return cacheKey{address: server.Address, token: server.Token}
+}
+
+type connectionsCache struct {
+ sync.RWMutex
+ connections map[cacheKey]*grpc.ClientConn
+}
+
+var (
+ jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true}
+ cache = connectionsCache{
+ connections: make(map[cacheKey]*grpc.ClientConn),
+ }
+
+ connectionsTotal = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_gitaly_connections_total",
+ Help: "Number of Gitaly connections that have been established",
+ },
+ []string{"status"},
+ )
+)
+
+func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context {
+ md := metadata.New(nil)
+ for k, v := range features {
+ if !strings.HasPrefix(k, "gitaly-feature-") {
+ continue
+ }
+ md.Append(k, v)
+ }
+
+ return metadata.NewOutgoingContext(ctx, md)
+}
+
+func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *SmartHTTPClient, error) {
+ conn, err := getOrCreateConnection(server)
+ if err != nil {
+ return nil, nil, err
+ }
+ grpcClient := gitalypb.NewSmartHTTPServiceClient(conn)
+ return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil
+}
+
+func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) {
+ conn, err := getOrCreateConnection(server)
+ if err != nil {
+ return nil, nil, err
+ }
+ grpcClient := gitalypb.NewBlobServiceClient(conn)
+ return withOutgoingMetadata(ctx, server.Features), &BlobClient{grpcClient}, nil
+}
+
+func NewRepositoryClient(ctx context.Context, server Server) (context.Context, *RepositoryClient, error) {
+ conn, err := getOrCreateConnection(server)
+ if err != nil {
+ return nil, nil, err
+ }
+ grpcClient := gitalypb.NewRepositoryServiceClient(conn)
+ return withOutgoingMetadata(ctx, server.Features), &RepositoryClient{grpcClient}, nil
+}
+
+// NewNamespaceClient is only used by the Gitaly integration tests at present
+func NewNamespaceClient(ctx context.Context, server Server) (context.Context, *NamespaceClient, error) {
+ conn, err := getOrCreateConnection(server)
+ if err != nil {
+ return nil, nil, err
+ }
+ grpcClient := gitalypb.NewNamespaceServiceClient(conn)
+ return withOutgoingMetadata(ctx, server.Features), &NamespaceClient{grpcClient}, nil
+}
+
+func NewDiffClient(ctx context.Context, server Server) (context.Context, *DiffClient, error) {
+ conn, err := getOrCreateConnection(server)
+ if err != nil {
+ return nil, nil, err
+ }
+ grpcClient := gitalypb.NewDiffServiceClient(conn)
+ return withOutgoingMetadata(ctx, server.Features), &DiffClient{grpcClient}, nil
+}
+
+func getOrCreateConnection(server Server) (*grpc.ClientConn, error) {
+ key := server.cacheKey()
+
+ cache.RLock()
+ conn := cache.connections[key]
+ cache.RUnlock()
+
+ if conn != nil {
+ return conn, nil
+ }
+
+ cache.Lock()
+ defer cache.Unlock()
+
+ if conn := cache.connections[key]; conn != nil {
+ return conn, nil
+ }
+
+ conn, err := newConnection(server)
+ if err != nil {
+ return nil, err
+ }
+
+ cache.connections[key] = conn
+
+ return conn, nil
+}
+
+func CloseConnections() {
+ cache.Lock()
+ defer cache.Unlock()
+
+ for _, conn := range cache.connections {
+ conn.Close()
+ }
+}
+
+func newConnection(server Server) (*grpc.ClientConn, error) {
+ connOpts := append(gitalyclient.DefaultDialOpts,
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(server.Token)),
+ grpc.WithStreamInterceptor(
+ grpc_middleware.ChainStreamClient(
+ grpctracing.StreamClientTracingInterceptor(),
+ grpc_prometheus.StreamClientInterceptor,
+ grpccorrelation.StreamClientCorrelationInterceptor(
+ grpccorrelation.WithClientName("gitlab-workhorse"),
+ ),
+ ),
+ ),
+
+ grpc.WithUnaryInterceptor(
+ grpc_middleware.ChainUnaryClient(
+ grpctracing.UnaryClientTracingInterceptor(),
+ grpc_prometheus.UnaryClientInterceptor,
+ grpccorrelation.UnaryClientCorrelationInterceptor(
+ grpccorrelation.WithClientName("gitlab-workhorse"),
+ ),
+ ),
+ ),
+ )
+
+ conn, connErr := gitalyclient.Dial(server.Address, connOpts)
+
+ label := "ok"
+ if connErr != nil {
+ label = "fail"
+ }
+ connectionsTotal.WithLabelValues(label).Inc()
+
+ return conn, connErr
+}
+
+func UnmarshalJSON(s string, msg proto.Message) error {
+ return jsonUnMarshaler.Unmarshal(strings.NewReader(s), msg)
+}
diff --git a/workhorse/internal/gitaly/gitaly_test.go b/workhorse/internal/gitaly/gitaly_test.go
new file mode 100644
index 00000000000..b17fb5c1d7b
--- /dev/null
+++ b/workhorse/internal/gitaly/gitaly_test.go
@@ -0,0 +1,80 @@
+package gitaly
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc/metadata"
+)
+
+func TestNewSmartHTTPClient(t *testing.T) {
+ ctx, _, err := NewSmartHTTPClient(context.Background(), serverFixture())
+ require.NoError(t, err)
+ testOutgoingMetadata(t, ctx)
+}
+
+func TestNewBlobClient(t *testing.T) {
+ ctx, _, err := NewBlobClient(context.Background(), serverFixture())
+ require.NoError(t, err)
+ testOutgoingMetadata(t, ctx)
+}
+
+func TestNewRepositoryClient(t *testing.T) {
+ ctx, _, err := NewRepositoryClient(context.Background(), serverFixture())
+ require.NoError(t, err)
+ testOutgoingMetadata(t, ctx)
+}
+
+func TestNewNamespaceClient(t *testing.T) {
+ ctx, _, err := NewNamespaceClient(context.Background(), serverFixture())
+ require.NoError(t, err)
+ testOutgoingMetadata(t, ctx)
+}
+
+func TestNewDiffClient(t *testing.T) {
+ ctx, _, err := NewDiffClient(context.Background(), serverFixture())
+ require.NoError(t, err)
+ testOutgoingMetadata(t, ctx)
+}
+
+func testOutgoingMetadata(t *testing.T, ctx context.Context) {
+ md, ok := metadata.FromOutgoingContext(ctx)
+ require.True(t, ok, "get metadata from context")
+
+ for k, v := range allowedFeatures() {
+ actual := md[k]
+ require.Len(t, actual, 1, "expect one value for %v", k)
+ require.Equal(t, v, actual[0], "value for %v", k)
+ }
+
+ for k := range badFeatureMetadata() {
+ require.Empty(t, md[k], "value for bad key %v", k)
+ }
+}
+
+func serverFixture() Server {
+ features := make(map[string]string)
+ for k, v := range allowedFeatures() {
+ features[k] = v
+ }
+ for k, v := range badFeatureMetadata() {
+ features[k] = v
+ }
+
+ return Server{Address: "tcp://localhost:123", Features: features}
+}
+
+func allowedFeatures() map[string]string {
+ return map[string]string{
+ "gitaly-feature-foo": "bar",
+ "gitaly-feature-qux": "baz",
+ }
+}
+
+func badFeatureMetadata() map[string]string {
+ return map[string]string{
+ "bad-metadata-1": "bad-value-1",
+ "bad-metadata-2": "bad-value-2",
+ }
+}
diff --git a/workhorse/internal/gitaly/namespace.go b/workhorse/internal/gitaly/namespace.go
new file mode 100644
index 00000000000..6db6ed4fc32
--- /dev/null
+++ b/workhorse/internal/gitaly/namespace.go
@@ -0,0 +1,8 @@
+package gitaly
+
+import "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+
+// NamespaceClient encapsulates NamespaceService calls
+type NamespaceClient struct {
+ gitalypb.NamespaceServiceClient
+}
diff --git a/workhorse/internal/gitaly/repository.go b/workhorse/internal/gitaly/repository.go
new file mode 100644
index 00000000000..e3ec3257a85
--- /dev/null
+++ b/workhorse/internal/gitaly/repository.go
@@ -0,0 +1,45 @@
+package gitaly
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/streamio"
+)
+
+// RepositoryClient encapsulates RepositoryService calls
+type RepositoryClient struct {
+ gitalypb.RepositoryServiceClient
+}
+
+// ArchiveReader performs a GetArchive Gitaly request and returns an io.Reader
+// for the response
+func (client *RepositoryClient) ArchiveReader(ctx context.Context, request *gitalypb.GetArchiveRequest) (io.Reader, error) {
+ c, err := client.GetArchive(ctx, request)
+ if err != nil {
+ return nil, fmt.Errorf("RepositoryService::GetArchive: %v", err)
+ }
+
+ return streamio.NewReader(func() ([]byte, error) {
+ resp, err := c.Recv()
+
+ return resp.GetData(), err
+ }), nil
+}
+
+// SnapshotReader performs a GetSnapshot Gitaly request and returns an io.Reader
+// for the response
+func (client *RepositoryClient) SnapshotReader(ctx context.Context, request *gitalypb.GetSnapshotRequest) (io.Reader, error) {
+ c, err := client.GetSnapshot(ctx, request)
+ if err != nil {
+ return nil, fmt.Errorf("RepositoryService::GetSnapshot: %v", err)
+ }
+
+ return streamio.NewReader(func() ([]byte, error) {
+ resp, err := c.Recv()
+
+ return resp.GetData(), err
+ }), nil
+}
diff --git a/workhorse/internal/gitaly/smarthttp.go b/workhorse/internal/gitaly/smarthttp.go
new file mode 100644
index 00000000000..d1fe6fae5ba
--- /dev/null
+++ b/workhorse/internal/gitaly/smarthttp.go
@@ -0,0 +1,139 @@
+package gitaly
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/streamio"
+)
+
+type SmartHTTPClient struct {
+ gitalypb.SmartHTTPServiceClient
+}
+
+func (client *SmartHTTPClient) InfoRefsResponseReader(ctx context.Context, repo *gitalypb.Repository, rpc string, gitConfigOptions []string, gitProtocol string) (io.Reader, error) {
+ rpcRequest := &gitalypb.InfoRefsRequest{
+ Repository: repo,
+ GitConfigOptions: gitConfigOptions,
+ GitProtocol: gitProtocol,
+ }
+
+ switch rpc {
+ case "git-upload-pack":
+ stream, err := client.InfoRefsUploadPack(ctx, rpcRequest)
+ return infoRefsReader(stream), err
+ case "git-receive-pack":
+ stream, err := client.InfoRefsReceivePack(ctx, rpcRequest)
+ return infoRefsReader(stream), err
+ default:
+ return nil, fmt.Errorf("InfoRefsResponseWriterTo: Unsupported RPC: %q", rpc)
+ }
+}
+
+type infoRefsClient interface {
+ Recv() (*gitalypb.InfoRefsResponse, error)
+}
+
+func infoRefsReader(stream infoRefsClient) io.Reader {
+ return streamio.NewReader(func() ([]byte, error) {
+ resp, err := stream.Recv()
+ return resp.GetData(), err
+ })
+}
+
+func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.Repository, glId string, glUsername string, glRepository string, gitConfigOptions []string, clientRequest io.Reader, clientResponse io.Writer, gitProtocol string) error {
+ stream, err := client.PostReceivePack(ctx)
+ if err != nil {
+ return err
+ }
+
+ rpcRequest := &gitalypb.PostReceivePackRequest{
+ Repository: repo,
+ GlId: glId,
+ GlUsername: glUsername,
+ GlRepository: glRepository,
+ GitConfigOptions: gitConfigOptions,
+ GitProtocol: gitProtocol,
+ }
+
+ if err := stream.Send(rpcRequest); err != nil {
+ return fmt.Errorf("initial request: %v", err)
+ }
+
+ numStreams := 2
+ errC := make(chan error, numStreams)
+
+ go func() {
+ rr := streamio.NewReader(func() ([]byte, error) {
+ response, err := stream.Recv()
+ return response.GetData(), err
+ })
+ _, err := io.Copy(clientResponse, rr)
+ errC <- err
+ }()
+
+ go func() {
+ sw := streamio.NewWriter(func(data []byte) error {
+ return stream.Send(&gitalypb.PostReceivePackRequest{Data: data})
+ })
+ _, err := io.Copy(sw, clientRequest)
+ stream.CloseSend()
+ errC <- err
+ }()
+
+ for i := 0; i < numStreams; i++ {
+ if err := <-errC; err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
+ stream, err := client.PostUploadPack(ctx)
+ if err != nil {
+ return err
+ }
+
+ rpcRequest := &gitalypb.PostUploadPackRequest{
+ Repository: repo,
+ GitConfigOptions: gitConfigOptions,
+ GitProtocol: gitProtocol,
+ }
+
+ if err := stream.Send(rpcRequest); err != nil {
+ return fmt.Errorf("initial request: %v", err)
+ }
+
+ numStreams := 2
+ errC := make(chan error, numStreams)
+
+ go func() {
+ rr := streamio.NewReader(func() ([]byte, error) {
+ response, err := stream.Recv()
+ return response.GetData(), err
+ })
+ _, err := io.Copy(clientResponse, rr)
+ errC <- err
+ }()
+
+ go func() {
+ sw := streamio.NewWriter(func(data []byte) error {
+ return stream.Send(&gitalypb.PostUploadPackRequest{Data: data})
+ })
+ _, err := io.Copy(sw, clientRequest)
+ stream.CloseSend()
+ errC <- err
+ }()
+
+ for i := 0; i < numStreams; i++ {
+ if err := <-errC; err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/workhorse/internal/gitaly/unmarshal_test.go b/workhorse/internal/gitaly/unmarshal_test.go
new file mode 100644
index 00000000000..e2256903339
--- /dev/null
+++ b/workhorse/internal/gitaly/unmarshal_test.go
@@ -0,0 +1,35 @@
+package gitaly
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+func TestUnmarshalJSON(t *testing.T) {
+ testCases := []struct {
+ desc string
+ in string
+ out gitalypb.Repository
+ }{
+ {
+ desc: "basic example",
+ in: `{"relative_path":"foo/bar.git"}`,
+ out: gitalypb.Repository{RelativePath: "foo/bar.git"},
+ },
+ {
+ desc: "unknown field",
+ in: `{"relative_path":"foo/bar.git","unknown_field":12345}`,
+ out: gitalypb.Repository{RelativePath: "foo/bar.git"},
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ result := gitalypb.Repository{}
+ require.NoError(t, UnmarshalJSON(tc.in, &result))
+ require.Equal(t, tc.out, result)
+ })
+ }
+}