Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/testhelper/gitaly.go')
-rw-r--r--workhorse/internal/testhelper/gitaly.go384
1 files changed, 384 insertions, 0 deletions
diff --git a/workhorse/internal/testhelper/gitaly.go b/workhorse/internal/testhelper/gitaly.go
new file mode 100644
index 00000000000..24884505440
--- /dev/null
+++ b/workhorse/internal/testhelper/gitaly.go
@@ -0,0 +1,384 @@
+package testhelper
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "path"
+ "strings"
+ "sync"
+
+ "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274
+ "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/log"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+type GitalyTestServer struct {
+ finalMessageCode codes.Code
+ sync.WaitGroup
+ LastIncomingMetadata metadata.MD
+ gitalypb.UnimplementedRepositoryServiceServer
+ gitalypb.UnimplementedBlobServiceServer
+ gitalypb.UnimplementedDiffServiceServer
+}
+
+var (
+ GitalyInfoRefsResponseMock = strings.Repeat("Mock Gitaly InfoRefsResponse data", 100000)
+ GitalyGetBlobResponseMock = strings.Repeat("Mock Gitaly GetBlobResponse data", 100000)
+ GitalyGetArchiveResponseMock = strings.Repeat("Mock Gitaly GetArchiveResponse data", 100000)
+ GitalyGetDiffResponseMock = strings.Repeat("Mock Gitaly GetDiffResponse data", 100000)
+ GitalyGetPatchResponseMock = strings.Repeat("Mock Gitaly GetPatchResponse data", 100000)
+
+ GitalyGetSnapshotResponseMock = strings.Repeat("Mock Gitaly GetSnapshotResponse data", 100000)
+
+ GitalyReceivePackResponseMock []byte
+ GitalyUploadPackResponseMock []byte
+)
+
+func init() {
+ var err error
+ if GitalyReceivePackResponseMock, err = ioutil.ReadFile(path.Join(RootDir(), "testdata/receive-pack-fixture.txt")); err != nil {
+ log.WithError(err).Fatal("Unable to read pack response")
+ }
+ if GitalyUploadPackResponseMock, err = ioutil.ReadFile(path.Join(RootDir(), "testdata/upload-pack-fixture.txt")); err != nil {
+ log.WithError(err).Fatal("Unable to read pack response")
+ }
+}
+
+func NewGitalyServer(finalMessageCode codes.Code) *GitalyTestServer {
+ return &GitalyTestServer{finalMessageCode: finalMessageCode}
+}
+
+func (s *GitalyTestServer) InfoRefsUploadPack(in *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error {
+ s.WaitGroup.Add(1)
+ defer s.WaitGroup.Done()
+
+ if err := validateRepository(in.GetRepository()); err != nil {
+ return err
+ }
+
+ fmt.Printf("Result: %+v\n", in)
+
+ marshaler := &jsonpb.Marshaler{}
+ jsonString, err := marshaler.MarshalToString(in)
+ if err != nil {
+ return err
+ }
+
+ data := []byte(strings.Join([]string{
+ jsonString,
+ "git-upload-pack",
+ GitalyInfoRefsResponseMock,
+ }, "\000"))
+
+ s.LastIncomingMetadata = nil
+ if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
+ s.LastIncomingMetadata = md
+ }
+
+ return s.sendInfoRefs(stream, data)
+}
+
+func (s *GitalyTestServer) InfoRefsReceivePack(in *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsReceivePackServer) error {
+ s.WaitGroup.Add(1)
+ defer s.WaitGroup.Done()
+
+ if err := validateRepository(in.GetRepository()); err != nil {
+ return err
+ }
+
+ fmt.Printf("Result: %+v\n", in)
+
+ jsonString, err := marshalJSON(in)
+ if err != nil {
+ return err
+ }
+
+ data := []byte(strings.Join([]string{
+ jsonString,
+ "git-receive-pack",
+ GitalyInfoRefsResponseMock,
+ }, "\000"))
+
+ return s.sendInfoRefs(stream, data)
+}
+
+func marshalJSON(msg proto.Message) (string, error) {
+ marshaler := &jsonpb.Marshaler{}
+ return marshaler.MarshalToString(msg)
+}
+
+type infoRefsSender interface {
+ Send(*gitalypb.InfoRefsResponse) error
+}
+
+func (s *GitalyTestServer) sendInfoRefs(stream infoRefsSender, data []byte) error {
+ nSends, err := sendBytes(data, 100, func(p []byte) error {
+ return stream.Send(&gitalypb.InfoRefsResponse{Data: p})
+ })
+ if err != nil {
+ return err
+ }
+ if nSends <= 1 {
+ panic("should have sent more than one message")
+ }
+
+ return s.finalError()
+}
+
+func (s *GitalyTestServer) PostReceivePack(stream gitalypb.SmartHTTPService_PostReceivePackServer) error {
+ s.WaitGroup.Add(1)
+ defer s.WaitGroup.Done()
+
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ repo := req.GetRepository()
+ if err := validateRepository(repo); err != nil {
+ return err
+ }
+
+ jsonString, err := marshalJSON(req)
+ if err != nil {
+ return err
+ }
+
+ data := []byte(jsonString + "\000")
+
+ // The body of the request starts in the second message
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+ break
+ }
+
+ // We want to echo the request data back
+ data = append(data, req.GetData()...)
+ }
+
+ nSends, _ := sendBytes(data, 100, func(p []byte) error {
+ return stream.Send(&gitalypb.PostReceivePackResponse{Data: p})
+ })
+
+ if nSends <= 1 {
+ panic("should have sent more than one message")
+ }
+
+ return s.finalError()
+}
+
+func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackServer) error {
+ s.WaitGroup.Add(1)
+ defer s.WaitGroup.Done()
+
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ if err := validateRepository(req.GetRepository()); err != nil {
+ return err
+ }
+
+ jsonString, err := marshalJSON(req)
+ if err != nil {
+ return err
+ }
+
+ if err := stream.Send(&gitalypb.PostUploadPackResponse{
+ Data: []byte(strings.Join([]string{jsonString}, "\000") + "\000"),
+ }); err != nil {
+ return err
+ }
+
+ nSends := 0
+ // The body of the request starts in the second message. Gitaly streams PostUploadPack responses
+ // as soon as possible without reading the request completely first. We stream messages here
+ // directly back to the client to simulate the streaming of the actual implementation.
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+ break
+ }
+
+ if err := stream.Send(&gitalypb.PostUploadPackResponse{Data: req.GetData()}); err != nil {
+ return err
+ }
+
+ nSends++
+ }
+
+ if nSends <= 1 {
+ panic("should have sent more than one message")
+ }
+
+ return s.finalError()
+}
+
+func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) {
+ return nil, nil
+}
+
+func (s *GitalyTestServer) GetBlob(in *gitalypb.GetBlobRequest, stream gitalypb.BlobService_GetBlobServer) error {
+ s.WaitGroup.Add(1)
+ defer s.WaitGroup.Done()
+
+ if err := validateRepository(in.GetRepository()); err != nil {
+ return err
+ }
+
+ response := &gitalypb.GetBlobResponse{
+ Oid: in.GetOid(),
+ Size: int64(len(GitalyGetBlobResponseMock)),
+ }
+ nSends, err := sendBytes([]byte(GitalyGetBlobResponseMock), 100, func(p []byte) error {
+ response.Data = p
+
+ if err := stream.Send(response); err != nil {
+ return err
+ }
+
+ // Use a new response so we don't send other fields (Size, ...) over and over
+ response = &gitalypb.GetBlobResponse{}
+
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ if nSends <= 1 {
+ panic("should have sent more than one message")
+ }
+
+ return s.finalError()
+}
+
+func (s *GitalyTestServer) GetArchive(in *gitalypb.GetArchiveRequest, stream gitalypb.RepositoryService_GetArchiveServer) error {
+ s.WaitGroup.Add(1)
+ defer s.WaitGroup.Done()
+
+ if err := validateRepository(in.GetRepository()); err != nil {
+ return err
+ }
+
+ nSends, err := sendBytes([]byte(GitalyGetArchiveResponseMock), 100, func(p []byte) error {
+ return stream.Send(&gitalypb.GetArchiveResponse{Data: p})
+ })
+ if err != nil {
+ return err
+ }
+ if nSends <= 1 {
+ panic("should have sent more than one message")
+ }
+
+ return s.finalError()
+}
+
+func (s *GitalyTestServer) RawDiff(in *gitalypb.RawDiffRequest, stream gitalypb.DiffService_RawDiffServer) error {
+ nSends, err := sendBytes([]byte(GitalyGetDiffResponseMock), 100, func(p []byte) error {
+ return stream.Send(&gitalypb.RawDiffResponse{
+ Data: p,
+ })
+ })
+ if err != nil {
+ return err
+ }
+ if nSends <= 1 {
+ panic("should have sent more than one message")
+ }
+
+ return s.finalError()
+}
+
+func (s *GitalyTestServer) RawPatch(in *gitalypb.RawPatchRequest, stream gitalypb.DiffService_RawPatchServer) error {
+ s.WaitGroup.Add(1)
+ defer s.WaitGroup.Done()
+
+ if err := validateRepository(in.GetRepository()); err != nil {
+ return err
+ }
+
+ nSends, err := sendBytes([]byte(GitalyGetPatchResponseMock), 100, func(p []byte) error {
+ return stream.Send(&gitalypb.RawPatchResponse{
+ Data: p,
+ })
+ })
+ if err != nil {
+ return err
+ }
+ if nSends <= 1 {
+ panic("should have sent more than one message")
+ }
+
+ return s.finalError()
+}
+
+func (s *GitalyTestServer) GetSnapshot(in *gitalypb.GetSnapshotRequest, stream gitalypb.RepositoryService_GetSnapshotServer) error {
+ s.WaitGroup.Add(1)
+ defer s.WaitGroup.Done()
+
+ if err := validateRepository(in.GetRepository()); err != nil {
+ return err
+ }
+
+ nSends, err := sendBytes([]byte(GitalyGetSnapshotResponseMock), 100, func(p []byte) error {
+ return stream.Send(&gitalypb.GetSnapshotResponse{Data: p})
+ })
+ if err != nil {
+ return err
+ }
+ if nSends <= 1 {
+ panic("should have sent more than one message")
+ }
+
+ return s.finalError()
+}
+
+// sendBytes returns the number of times the 'sender' function was called and an error.
+func sendBytes(data []byte, chunkSize int, sender func([]byte) error) (int, error) {
+ i := 0
+ for ; len(data) > 0; i++ {
+ n := chunkSize
+ if n > len(data) {
+ n = len(data)
+ }
+
+ if err := sender(data[:n]); err != nil {
+ return i, err
+ }
+ data = data[n:]
+ }
+
+ return i, nil
+}
+
+func (s *GitalyTestServer) finalError() error {
+ if code := s.finalMessageCode; code != codes.OK {
+ return status.Errorf(code, "error as specified by test")
+ }
+
+ return nil
+}
+
+func validateRepository(repo *gitalypb.Repository) error {
+ if len(repo.GetStorageName()) == 0 {
+ return fmt.Errorf("missing storage_name: %v", repo)
+ }
+ if len(repo.GetRelativePath()) == 0 {
+ return fmt.Errorf("missing relative_path: %v", repo)
+ }
+ return nil
+}