diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2017-06-22 17:42:43 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2017-06-26 12:08:05 +0300 |
commit | 28a2fcaf9d643de52371f0051bc2734d7236152f (patch) | |
tree | 8c8e90b0781a8153231ecdaf8e4d1409b5fc9b97 | |
parent | b933e5ce4843ec6c332a0184afb8e69820cc9050 (diff) |
Use stream helpers from top-level package
-rw-r--r-- | internal/service/commit/tree_entry.go | 4 | ||||
-rw-r--r-- | internal/service/smarthttp/inforefs.go | 6 | ||||
-rw-r--r-- | internal/service/smarthttp/receive_pack.go | 6 | ||||
-rw-r--r-- | internal/service/smarthttp/receive_pack_test.go | 6 | ||||
-rw-r--r-- | internal/service/smarthttp/upload_pack.go | 6 | ||||
-rw-r--r-- | internal/service/smarthttp/upload_pack_test.go | 8 | ||||
-rw-r--r-- | internal/service/ssh/receive_pack.go | 8 | ||||
-rw-r--r-- | internal/service/ssh/uploadpack.go | 8 | ||||
-rw-r--r-- | streamio/stream.go | 44 | ||||
-rw-r--r-- | streamio/stream_test.go | 63 |
10 files changed, 133 insertions, 26 deletions
diff --git a/internal/service/commit/tree_entry.go b/internal/service/commit/tree_entry.go index 4ad1ddbff..954f4777f 100644 --- a/internal/service/commit/tree_entry.go +++ b/internal/service/commit/tree_entry.go @@ -12,7 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" pb "gitlab.com/gitlab-org/gitaly-proto/go" - pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" + "gitlab.com/gitlab-org/gitaly/streamio" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -132,7 +132,7 @@ func (s *server) TreeEntry(in *pb.TreeEntryRequest, stream pb.Commit_TreeEntrySe return helper.DecorateError(codes.Unavailable, stream.Send(response)) } - sw := pbhelper.NewSendWriter(func(p []byte) error { + sw := streamio.NewWriter(func(p []byte) error { response.Data = p if err := stream.Send(response); err != nil { diff --git a/internal/service/smarthttp/inforefs.go b/internal/service/smarthttp/inforefs.go index 8eae55974..7dc10740f 100644 --- a/internal/service/smarthttp/inforefs.go +++ b/internal/service/smarthttp/inforefs.go @@ -6,22 +6,22 @@ import ( log "github.com/Sirupsen/logrus" pb "gitlab.com/gitlab-org/gitaly-proto/go" - pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/streamio" "google.golang.org/grpc" "google.golang.org/grpc/codes" ) func (s *server) InfoRefsUploadPack(in *pb.InfoRefsRequest, stream pb.SmartHTTP_InfoRefsUploadPackServer) error { - w := pbhelper.NewSendWriter(func(p []byte) error { + w := streamio.NewWriter(func(p []byte) error { return stream.Send(&pb.InfoRefsResponse{Data: p}) }) return handleInfoRefs("upload-pack", in.Repository, w) } func (s *server) InfoRefsReceivePack(in *pb.InfoRefsRequest, stream pb.SmartHTTP_InfoRefsReceivePackServer) error { - w := pbhelper.NewSendWriter(func(p []byte) error { + w := streamio.NewWriter(func(p []byte) error { return stream.Send(&pb.InfoRefsResponse{Data: p}) }) return handleInfoRefs("receive-pack", in.Repository, w) diff --git a/internal/service/smarthttp/receive_pack.go b/internal/service/smarthttp/receive_pack.go index 9251d8b88..28fff96dd 100644 --- a/internal/service/smarthttp/receive_pack.go +++ b/internal/service/smarthttp/receive_pack.go @@ -8,7 +8,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" pb "gitlab.com/gitlab-org/gitaly-proto/go" - pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" + "gitlab.com/gitlab-org/gitaly/streamio" "google.golang.org/grpc" "google.golang.org/grpc/codes" ) @@ -22,11 +22,11 @@ func (s *server) PostReceivePack(stream pb.SmartHTTP_PostReceivePackServer) erro return err } - stdin := pbhelper.NewReceiveReader(func() ([]byte, error) { + stdin := streamio.NewReader(func() ([]byte, error) { resp, err := stream.Recv() return resp.GetData(), err }) - stdout := pbhelper.NewSendWriter(func(p []byte) error { + stdout := streamio.NewWriter(func(p []byte) error { return stream.Send(&pb.PostReceivePackResponse{Data: p}) }) env := []string{ diff --git a/internal/service/smarthttp/receive_pack_test.go b/internal/service/smarthttp/receive_pack_test.go index 3f1cc32e7..74573e2de 100644 --- a/internal/service/smarthttp/receive_pack_test.go +++ b/internal/service/smarthttp/receive_pack_test.go @@ -12,7 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" pb "gitlab.com/gitlab-org/gitaly-proto/go" - pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" + "gitlab.com/gitlab-org/gitaly/streamio" "github.com/stretchr/testify/require" "golang.org/x/net/context" @@ -78,7 +78,7 @@ func TestSuccessfulReceivePackRequest(t *testing.T) { require.NoError(t, stream.Send(rpcRequest)) - sw := pbhelper.NewSendWriter(func(p []byte) error { + sw := streamio.NewWriter(func(p []byte) error { return stream.Send(&pb.PostReceivePackRequest{Data: p}) }) _, err = io.Copy(sw, requestBuffer) @@ -88,7 +88,7 @@ func TestSuccessfulReceivePackRequest(t *testing.T) { // Verify everything is going as planned responseBuffer := bytes.Buffer{} - rr := pbhelper.NewReceiveReader(func() ([]byte, error) { + rr := streamio.NewReader(func() ([]byte, error) { resp, err := stream.Recv() return resp.GetData(), err }) diff --git a/internal/service/smarthttp/upload_pack.go b/internal/service/smarthttp/upload_pack.go index fad83d14d..5510d864b 100644 --- a/internal/service/smarthttp/upload_pack.go +++ b/internal/service/smarthttp/upload_pack.go @@ -8,7 +8,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" pb "gitlab.com/gitlab-org/gitaly-proto/go" - pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" + "gitlab.com/gitlab-org/gitaly/streamio" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" @@ -37,7 +37,7 @@ func (s *server) PostUploadPack(stream pb.SmartHTTP_PostUploadPackServer) error return err } - stdinReader := pbhelper.NewReceiveReader(func() ([]byte, error) { + stdinReader := streamio.NewReader(func() ([]byte, error) { resp, err := stream.Recv() return resp.GetData(), err }) @@ -49,7 +49,7 @@ func (s *server) PostUploadPack(stream pb.SmartHTTP_PostUploadPackServer) error deepenCh <- scanDeepen(pr) }() - stdout := pbhelper.NewSendWriter(func(p []byte) error { + stdout := streamio.NewWriter(func(p []byte) error { return stream.Send(&pb.PostUploadPackResponse{Data: p}) }) repoPath, err := helper.GetRepoPath(req.Repository) diff --git a/internal/service/smarthttp/upload_pack_test.go b/internal/service/smarthttp/upload_pack_test.go index 7f8665ac0..a2f777cd4 100644 --- a/internal/service/smarthttp/upload_pack_test.go +++ b/internal/service/smarthttp/upload_pack_test.go @@ -15,7 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" pb "gitlab.com/gitlab-org/gitaly-proto/go" - pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" + "gitlab.com/gitlab-org/gitaly/streamio" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -74,7 +74,7 @@ func TestSuccessfulUploadPackRequest(t *testing.T) { require.NoError(t, stream.Send(rpcRequest)) - sw := pbhelper.NewSendWriter(func(p []byte) error { + sw := streamio.NewWriter(func(p []byte) error { return stream.Send(&pb.PostUploadPackRequest{Data: p}) }) _, err = io.Copy(sw, requestBuffer) @@ -82,7 +82,7 @@ func TestSuccessfulUploadPackRequest(t *testing.T) { stream.CloseSend() responseBuffer := &bytes.Buffer{} - rr := pbhelper.NewReceiveReader(func() ([]byte, error) { + rr := streamio.NewReader(func() ([]byte, error) { resp, err := stream.Recv() return resp.GetData(), err }) @@ -118,7 +118,7 @@ func TestSuccessfulUploadPackDeepenRequest(t *testing.T) { require.NoError(t, stream.Send(&pb.PostUploadPackRequest{Data: []byte(requestBody)})) stream.CloseSend() - rr := pbhelper.NewReceiveReader(func() ([]byte, error) { + rr := streamio.NewReader(func() ([]byte, error) { resp, err := stream.Recv() return resp.GetData(), err }) diff --git a/internal/service/ssh/receive_pack.go b/internal/service/ssh/receive_pack.go index 5eadbc3cc..59ff65f5e 100644 --- a/internal/service/ssh/receive_pack.go +++ b/internal/service/ssh/receive_pack.go @@ -8,7 +8,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" pb "gitlab.com/gitlab-org/gitaly-proto/go" - pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" + "gitlab.com/gitlab-org/gitaly/streamio" "google.golang.org/grpc" "google.golang.org/grpc/codes" ) @@ -22,14 +22,14 @@ func (s *server) SSHReceivePack(stream pb.SSH_SSHReceivePackServer) error { return err } - stdin := pbhelper.NewReceiveReader(func() ([]byte, error) { + stdin := streamio.NewReader(func() ([]byte, error) { request, err := stream.Recv() return request.GetStdin(), err }) - stdout := pbhelper.NewSendWriter(func(p []byte) error { + stdout := streamio.NewWriter(func(p []byte) error { return stream.Send(&pb.SSHReceivePackResponse{Stdout: p}) }) - stderr := pbhelper.NewSendWriter(func(p []byte) error { + stderr := streamio.NewWriter(func(p []byte) error { return stream.Send(&pb.SSHReceivePackResponse{Stderr: p}) }) env := []string{ diff --git a/internal/service/ssh/uploadpack.go b/internal/service/ssh/uploadpack.go index aecf251dd..9bf3b3455 100644 --- a/internal/service/ssh/uploadpack.go +++ b/internal/service/ssh/uploadpack.go @@ -5,8 +5,8 @@ import ( log "github.com/Sirupsen/logrus" pb "gitlab.com/gitlab-org/gitaly-proto/go" - pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/streamio" "google.golang.org/grpc" "google.golang.org/grpc/codes" ) @@ -20,14 +20,14 @@ func (s *server) SSHUploadPack(stream pb.SSH_SSHUploadPackServer) error { return err } - stdin := pbhelper.NewReceiveReader(func() ([]byte, error) { + stdin := streamio.NewReader(func() ([]byte, error) { request, err := stream.Recv() return request.GetStdin(), err }) - stdout := pbhelper.NewSendWriter(func(p []byte) error { + stdout := streamio.NewWriter(func(p []byte) error { return stream.Send(&pb.SSHUploadPackResponse{Stdout: p}) }) - stderr := pbhelper.NewSendWriter(func(p []byte) error { + stderr := streamio.NewWriter(func(p []byte) error { return stream.Send(&pb.SSHUploadPackResponse{Stderr: p}) }) repoPath, err := helper.GetRepoPath(req.Repository) diff --git a/streamio/stream.go b/streamio/stream.go new file mode 100644 index 000000000..dafade8fb --- /dev/null +++ b/streamio/stream.go @@ -0,0 +1,44 @@ +package streamio + +import ( + "io" +) + +// NewReader turns receiver into an io.Reader. Errors from the receiver +// function are passed on unmodified. This means receiver should emit +// io.EOF when done. +func NewReader(receiver func() ([]byte, error)) io.Reader { + return &receiveReader{receiver: receiver} +} + +type receiveReader struct { + receiver func() ([]byte, error) + data []byte + err error +} + +func (rr *receiveReader) Read(p []byte) (int, error) { + if len(rr.data) == 0 { + rr.data, rr.err = rr.receiver() + } + n := copy(p, rr.data) + rr.data = rr.data[n:] + if len(rr.data) == 0 { + return n, rr.err + } + return n, nil +} + +// NewWriter turns sender into an io.Writer. The number of 'bytes +// written' reported back is always len(p). +func NewWriter(sender func(p []byte) error) io.Writer { + return &sendWriter{sender: sender} +} + +type sendWriter struct { + sender func([]byte) error +} + +func (sw *sendWriter) Write(p []byte) (int, error) { + return len(p), sw.sender(p) +} diff --git a/streamio/stream_test.go b/streamio/stream_test.go new file mode 100644 index 000000000..456d17e80 --- /dev/null +++ b/streamio/stream_test.go @@ -0,0 +1,63 @@ +package streamio + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "strings" + "testing" + "testing/iotest" + + "github.com/stretchr/testify/require" +) + +func TestReceiveSources(t *testing.T) { + testData := "Hello this is the test data that will be received" + testCases := []struct { + desc string + r io.Reader + }{ + {desc: "base", r: strings.NewReader(testData)}, + {desc: "dataerr", r: iotest.DataErrReader(strings.NewReader(testData))}, + {desc: "onebyte", r: iotest.OneByteReader(strings.NewReader(testData))}, + {desc: "dataerr(onebyte)", r: iotest.DataErrReader(iotest.OneByteReader(strings.NewReader(testData)))}, + } + + for _, tc := range testCases { + data, err := ioutil.ReadAll(NewReader(receiverFromReader(tc.r))) + require.NoError(t, err, tc.desc) + require.Equal(t, testData, string(data), tc.desc) + } +} + +func TestReadSizes(t *testing.T) { + testData := "Hello this is the test data that will be received. It goes on for a while bla bla bla." + for n := 1; n < 100; n *= 3 { + desc := fmt.Sprintf("reads of size %d", n) + buffer := make([]byte, n) + result := &bytes.Buffer{} + reader := &opaqueReader{NewReader(receiverFromReader(strings.NewReader(testData)))} + _, err := io.CopyBuffer(&opaqueWriter{result}, reader, buffer) + require.NoError(t, err, desc) + require.Equal(t, testData, result.String()) + } +} + +func receiverFromReader(r io.Reader) func() ([]byte, error) { + return func() ([]byte, error) { + data := make([]byte, 10) + n, err := r.Read(data) + return data[:n], err + } +} + +// Hide io.WriteTo if it exists +type opaqueReader struct { + io.Reader +} + +// Hide io.ReadFrom if it exists +type opaqueWriter struct { + io.Writer +} |