diff options
Diffstat (limited to 'workhorse/internal/gitaly')
-rw-r--r-- | workhorse/internal/gitaly/blob.go | 41 | ||||
-rw-r--r-- | workhorse/internal/gitaly/diff.go | 55 | ||||
-rw-r--r-- | workhorse/internal/gitaly/gitaly.go | 188 | ||||
-rw-r--r-- | workhorse/internal/gitaly/gitaly_test.go | 80 | ||||
-rw-r--r-- | workhorse/internal/gitaly/namespace.go | 8 | ||||
-rw-r--r-- | workhorse/internal/gitaly/repository.go | 45 | ||||
-rw-r--r-- | workhorse/internal/gitaly/smarthttp.go | 139 | ||||
-rw-r--r-- | workhorse/internal/gitaly/unmarshal_test.go | 35 |
8 files changed, 591 insertions, 0 deletions
diff --git a/workhorse/internal/gitaly/blob.go b/workhorse/internal/gitaly/blob.go new file mode 100644 index 00000000000..c6f5d6676f3 --- /dev/null +++ b/workhorse/internal/gitaly/blob.go @@ -0,0 +1,41 @@ +package gitaly + +import ( + "context" + "fmt" + "io" + "net/http" + "strconv" + + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/streamio" +) + +type BlobClient struct { + gitalypb.BlobServiceClient +} + +func (client *BlobClient) SendBlob(ctx context.Context, w http.ResponseWriter, request *gitalypb.GetBlobRequest) error { + c, err := client.GetBlob(ctx, request) + if err != nil { + return fmt.Errorf("rpc failed: %v", err) + } + + firstResponseReceived := false + rr := streamio.NewReader(func() ([]byte, error) { + resp, err := c.Recv() + + if !firstResponseReceived && err == nil { + firstResponseReceived = true + w.Header().Set("Content-Length", strconv.FormatInt(resp.GetSize(), 10)) + } + + return resp.GetData(), err + }) + + if _, err := io.Copy(w, rr); err != nil { + return fmt.Errorf("copy rpc data: %v", err) + } + + return nil +} diff --git a/workhorse/internal/gitaly/diff.go b/workhorse/internal/gitaly/diff.go new file mode 100644 index 00000000000..035a58ec6fd --- /dev/null +++ b/workhorse/internal/gitaly/diff.go @@ -0,0 +1,55 @@ +package gitaly + +import ( + "context" + "fmt" + "io" + "net/http" + + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/streamio" +) + +type DiffClient struct { + gitalypb.DiffServiceClient +} + +func (client *DiffClient) SendRawDiff(ctx context.Context, w http.ResponseWriter, request *gitalypb.RawDiffRequest) error { + c, err := client.RawDiff(ctx, request) + if err != nil { + return fmt.Errorf("rpc failed: %v", err) + } + + w.Header().Del("Content-Length") + + rr := streamio.NewReader(func() ([]byte, error) { + resp, err := c.Recv() + return resp.GetData(), err + }) + + if _, err := io.Copy(w, rr); err != nil { + return fmt.Errorf("copy rpc data: %v", err) + } + + return nil +} + +func (client *DiffClient) SendRawPatch(ctx context.Context, w http.ResponseWriter, request *gitalypb.RawPatchRequest) error { + c, err := client.RawPatch(ctx, request) + if err != nil { + return fmt.Errorf("rpc failed: %v", err) + } + + w.Header().Del("Content-Length") + + rr := streamio.NewReader(func() ([]byte, error) { + resp, err := c.Recv() + return resp.GetData(), err + }) + + if _, err := io.Copy(w, rr); err != nil { + return fmt.Errorf("copy rpc data: %v", err) + } + + return nil +} diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go new file mode 100644 index 00000000000..c739ac8d9b2 --- /dev/null +++ b/workhorse/internal/gitaly/gitaly.go @@ -0,0 +1,188 @@ +package gitaly + +import ( + "context" + "strings" + "sync" + + "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274 + "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274 + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + gitalyclient "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" + grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" +) + +type Server struct { + Address string `json:"address"` + Token string `json:"token"` + Features map[string]string `json:"features"` +} + +type cacheKey struct{ address, token string } + +func (server Server) cacheKey() cacheKey { + return cacheKey{address: server.Address, token: server.Token} +} + +type connectionsCache struct { + sync.RWMutex + connections map[cacheKey]*grpc.ClientConn +} + +var ( + jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true} + cache = connectionsCache{ + connections: make(map[cacheKey]*grpc.ClientConn), + } + + connectionsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_gitaly_connections_total", + Help: "Number of Gitaly connections that have been established", + }, + []string{"status"}, + ) +) + +func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context { + md := metadata.New(nil) + for k, v := range features { + if !strings.HasPrefix(k, "gitaly-feature-") { + continue + } + md.Append(k, v) + } + + return metadata.NewOutgoingContext(ctx, md) +} + +func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *SmartHTTPClient, error) { + conn, err := getOrCreateConnection(server) + if err != nil { + return nil, nil, err + } + grpcClient := gitalypb.NewSmartHTTPServiceClient(conn) + return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil +} + +func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) { + conn, err := getOrCreateConnection(server) + if err != nil { + return nil, nil, err + } + grpcClient := gitalypb.NewBlobServiceClient(conn) + return withOutgoingMetadata(ctx, server.Features), &BlobClient{grpcClient}, nil +} + +func NewRepositoryClient(ctx context.Context, server Server) (context.Context, *RepositoryClient, error) { + conn, err := getOrCreateConnection(server) + if err != nil { + return nil, nil, err + } + grpcClient := gitalypb.NewRepositoryServiceClient(conn) + return withOutgoingMetadata(ctx, server.Features), &RepositoryClient{grpcClient}, nil +} + +// NewNamespaceClient is only used by the Gitaly integration tests at present +func NewNamespaceClient(ctx context.Context, server Server) (context.Context, *NamespaceClient, error) { + conn, err := getOrCreateConnection(server) + if err != nil { + return nil, nil, err + } + grpcClient := gitalypb.NewNamespaceServiceClient(conn) + return withOutgoingMetadata(ctx, server.Features), &NamespaceClient{grpcClient}, nil +} + +func NewDiffClient(ctx context.Context, server Server) (context.Context, *DiffClient, error) { + conn, err := getOrCreateConnection(server) + if err != nil { + return nil, nil, err + } + grpcClient := gitalypb.NewDiffServiceClient(conn) + return withOutgoingMetadata(ctx, server.Features), &DiffClient{grpcClient}, nil +} + +func getOrCreateConnection(server Server) (*grpc.ClientConn, error) { + key := server.cacheKey() + + cache.RLock() + conn := cache.connections[key] + cache.RUnlock() + + if conn != nil { + return conn, nil + } + + cache.Lock() + defer cache.Unlock() + + if conn := cache.connections[key]; conn != nil { + return conn, nil + } + + conn, err := newConnection(server) + if err != nil { + return nil, err + } + + cache.connections[key] = conn + + return conn, nil +} + +func CloseConnections() { + cache.Lock() + defer cache.Unlock() + + for _, conn := range cache.connections { + conn.Close() + } +} + +func newConnection(server Server) (*grpc.ClientConn, error) { + connOpts := append(gitalyclient.DefaultDialOpts, + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(server.Token)), + grpc.WithStreamInterceptor( + grpc_middleware.ChainStreamClient( + grpctracing.StreamClientTracingInterceptor(), + grpc_prometheus.StreamClientInterceptor, + grpccorrelation.StreamClientCorrelationInterceptor( + grpccorrelation.WithClientName("gitlab-workhorse"), + ), + ), + ), + + grpc.WithUnaryInterceptor( + grpc_middleware.ChainUnaryClient( + grpctracing.UnaryClientTracingInterceptor(), + grpc_prometheus.UnaryClientInterceptor, + grpccorrelation.UnaryClientCorrelationInterceptor( + grpccorrelation.WithClientName("gitlab-workhorse"), + ), + ), + ), + ) + + conn, connErr := gitalyclient.Dial(server.Address, connOpts) + + label := "ok" + if connErr != nil { + label = "fail" + } + connectionsTotal.WithLabelValues(label).Inc() + + return conn, connErr +} + +func UnmarshalJSON(s string, msg proto.Message) error { + return jsonUnMarshaler.Unmarshal(strings.NewReader(s), msg) +} diff --git a/workhorse/internal/gitaly/gitaly_test.go b/workhorse/internal/gitaly/gitaly_test.go new file mode 100644 index 00000000000..b17fb5c1d7b --- /dev/null +++ b/workhorse/internal/gitaly/gitaly_test.go @@ -0,0 +1,80 @@ +package gitaly + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" +) + +func TestNewSmartHTTPClient(t *testing.T) { + ctx, _, err := NewSmartHTTPClient(context.Background(), serverFixture()) + require.NoError(t, err) + testOutgoingMetadata(t, ctx) +} + +func TestNewBlobClient(t *testing.T) { + ctx, _, err := NewBlobClient(context.Background(), serverFixture()) + require.NoError(t, err) + testOutgoingMetadata(t, ctx) +} + +func TestNewRepositoryClient(t *testing.T) { + ctx, _, err := NewRepositoryClient(context.Background(), serverFixture()) + require.NoError(t, err) + testOutgoingMetadata(t, ctx) +} + +func TestNewNamespaceClient(t *testing.T) { + ctx, _, err := NewNamespaceClient(context.Background(), serverFixture()) + require.NoError(t, err) + testOutgoingMetadata(t, ctx) +} + +func TestNewDiffClient(t *testing.T) { + ctx, _, err := NewDiffClient(context.Background(), serverFixture()) + require.NoError(t, err) + testOutgoingMetadata(t, ctx) +} + +func testOutgoingMetadata(t *testing.T, ctx context.Context) { + md, ok := metadata.FromOutgoingContext(ctx) + require.True(t, ok, "get metadata from context") + + for k, v := range allowedFeatures() { + actual := md[k] + require.Len(t, actual, 1, "expect one value for %v", k) + require.Equal(t, v, actual[0], "value for %v", k) + } + + for k := range badFeatureMetadata() { + require.Empty(t, md[k], "value for bad key %v", k) + } +} + +func serverFixture() Server { + features := make(map[string]string) + for k, v := range allowedFeatures() { + features[k] = v + } + for k, v := range badFeatureMetadata() { + features[k] = v + } + + return Server{Address: "tcp://localhost:123", Features: features} +} + +func allowedFeatures() map[string]string { + return map[string]string{ + "gitaly-feature-foo": "bar", + "gitaly-feature-qux": "baz", + } +} + +func badFeatureMetadata() map[string]string { + return map[string]string{ + "bad-metadata-1": "bad-value-1", + "bad-metadata-2": "bad-value-2", + } +} diff --git a/workhorse/internal/gitaly/namespace.go b/workhorse/internal/gitaly/namespace.go new file mode 100644 index 00000000000..6db6ed4fc32 --- /dev/null +++ b/workhorse/internal/gitaly/namespace.go @@ -0,0 +1,8 @@ +package gitaly + +import "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + +// NamespaceClient encapsulates NamespaceService calls +type NamespaceClient struct { + gitalypb.NamespaceServiceClient +} diff --git a/workhorse/internal/gitaly/repository.go b/workhorse/internal/gitaly/repository.go new file mode 100644 index 00000000000..e3ec3257a85 --- /dev/null +++ b/workhorse/internal/gitaly/repository.go @@ -0,0 +1,45 @@ +package gitaly + +import ( + "context" + "fmt" + "io" + + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/streamio" +) + +// RepositoryClient encapsulates RepositoryService calls +type RepositoryClient struct { + gitalypb.RepositoryServiceClient +} + +// ArchiveReader performs a GetArchive Gitaly request and returns an io.Reader +// for the response +func (client *RepositoryClient) ArchiveReader(ctx context.Context, request *gitalypb.GetArchiveRequest) (io.Reader, error) { + c, err := client.GetArchive(ctx, request) + if err != nil { + return nil, fmt.Errorf("RepositoryService::GetArchive: %v", err) + } + + return streamio.NewReader(func() ([]byte, error) { + resp, err := c.Recv() + + return resp.GetData(), err + }), nil +} + +// SnapshotReader performs a GetSnapshot Gitaly request and returns an io.Reader +// for the response +func (client *RepositoryClient) SnapshotReader(ctx context.Context, request *gitalypb.GetSnapshotRequest) (io.Reader, error) { + c, err := client.GetSnapshot(ctx, request) + if err != nil { + return nil, fmt.Errorf("RepositoryService::GetSnapshot: %v", err) + } + + return streamio.NewReader(func() ([]byte, error) { + resp, err := c.Recv() + + return resp.GetData(), err + }), nil +} diff --git a/workhorse/internal/gitaly/smarthttp.go b/workhorse/internal/gitaly/smarthttp.go new file mode 100644 index 00000000000..d1fe6fae5ba --- /dev/null +++ b/workhorse/internal/gitaly/smarthttp.go @@ -0,0 +1,139 @@ +package gitaly + +import ( + "context" + "fmt" + "io" + + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/streamio" +) + +type SmartHTTPClient struct { + gitalypb.SmartHTTPServiceClient +} + +func (client *SmartHTTPClient) InfoRefsResponseReader(ctx context.Context, repo *gitalypb.Repository, rpc string, gitConfigOptions []string, gitProtocol string) (io.Reader, error) { + rpcRequest := &gitalypb.InfoRefsRequest{ + Repository: repo, + GitConfigOptions: gitConfigOptions, + GitProtocol: gitProtocol, + } + + switch rpc { + case "git-upload-pack": + stream, err := client.InfoRefsUploadPack(ctx, rpcRequest) + return infoRefsReader(stream), err + case "git-receive-pack": + stream, err := client.InfoRefsReceivePack(ctx, rpcRequest) + return infoRefsReader(stream), err + default: + return nil, fmt.Errorf("InfoRefsResponseWriterTo: Unsupported RPC: %q", rpc) + } +} + +type infoRefsClient interface { + Recv() (*gitalypb.InfoRefsResponse, error) +} + +func infoRefsReader(stream infoRefsClient) io.Reader { + return streamio.NewReader(func() ([]byte, error) { + resp, err := stream.Recv() + return resp.GetData(), err + }) +} + +func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.Repository, glId string, glUsername string, glRepository string, gitConfigOptions []string, clientRequest io.Reader, clientResponse io.Writer, gitProtocol string) error { + stream, err := client.PostReceivePack(ctx) + if err != nil { + return err + } + + rpcRequest := &gitalypb.PostReceivePackRequest{ + Repository: repo, + GlId: glId, + GlUsername: glUsername, + GlRepository: glRepository, + GitConfigOptions: gitConfigOptions, + GitProtocol: gitProtocol, + } + + if err := stream.Send(rpcRequest); err != nil { + return fmt.Errorf("initial request: %v", err) + } + + numStreams := 2 + errC := make(chan error, numStreams) + + go func() { + rr := streamio.NewReader(func() ([]byte, error) { + response, err := stream.Recv() + return response.GetData(), err + }) + _, err := io.Copy(clientResponse, rr) + errC <- err + }() + + go func() { + sw := streamio.NewWriter(func(data []byte) error { + return stream.Send(&gitalypb.PostReceivePackRequest{Data: data}) + }) + _, err := io.Copy(sw, clientRequest) + stream.CloseSend() + errC <- err + }() + + for i := 0; i < numStreams; i++ { + if err := <-errC; err != nil { + return err + } + } + + return nil +} + +func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { + stream, err := client.PostUploadPack(ctx) + if err != nil { + return err + } + + rpcRequest := &gitalypb.PostUploadPackRequest{ + Repository: repo, + GitConfigOptions: gitConfigOptions, + GitProtocol: gitProtocol, + } + + if err := stream.Send(rpcRequest); err != nil { + return fmt.Errorf("initial request: %v", err) + } + + numStreams := 2 + errC := make(chan error, numStreams) + + go func() { + rr := streamio.NewReader(func() ([]byte, error) { + response, err := stream.Recv() + return response.GetData(), err + }) + _, err := io.Copy(clientResponse, rr) + errC <- err + }() + + go func() { + sw := streamio.NewWriter(func(data []byte) error { + return stream.Send(&gitalypb.PostUploadPackRequest{Data: data}) + }) + _, err := io.Copy(sw, clientRequest) + stream.CloseSend() + errC <- err + }() + + for i := 0; i < numStreams; i++ { + if err := <-errC; err != nil { + return err + } + } + + return nil +} diff --git a/workhorse/internal/gitaly/unmarshal_test.go b/workhorse/internal/gitaly/unmarshal_test.go new file mode 100644 index 00000000000..e2256903339 --- /dev/null +++ b/workhorse/internal/gitaly/unmarshal_test.go @@ -0,0 +1,35 @@ +package gitaly + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +func TestUnmarshalJSON(t *testing.T) { + testCases := []struct { + desc string + in string + out gitalypb.Repository + }{ + { + desc: "basic example", + in: `{"relative_path":"foo/bar.git"}`, + out: gitalypb.Repository{RelativePath: "foo/bar.git"}, + }, + { + desc: "unknown field", + in: `{"relative_path":"foo/bar.git","unknown_field":12345}`, + out: gitalypb.Repository{RelativePath: "foo/bar.git"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + result := gitalypb.Repository{} + require.NoError(t, UnmarshalJSON(tc.in, &result)) + require.Equal(t, tc.out, result) + }) + } +} |