Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2017-06-22 17:42:43 +0300
committerJacob Vosmaer <jacob@gitlab.com>2017-06-26 12:08:05 +0300
commit28a2fcaf9d643de52371f0051bc2734d7236152f (patch)
tree8c8e90b0781a8153231ecdaf8e4d1409b5fc9b97
parentb933e5ce4843ec6c332a0184afb8e69820cc9050 (diff)
Use stream helpers from top-level package
-rw-r--r--internal/service/commit/tree_entry.go4
-rw-r--r--internal/service/smarthttp/inforefs.go6
-rw-r--r--internal/service/smarthttp/receive_pack.go6
-rw-r--r--internal/service/smarthttp/receive_pack_test.go6
-rw-r--r--internal/service/smarthttp/upload_pack.go6
-rw-r--r--internal/service/smarthttp/upload_pack_test.go8
-rw-r--r--internal/service/ssh/receive_pack.go8
-rw-r--r--internal/service/ssh/uploadpack.go8
-rw-r--r--streamio/stream.go44
-rw-r--r--streamio/stream_test.go63
10 files changed, 133 insertions, 26 deletions
diff --git a/internal/service/commit/tree_entry.go b/internal/service/commit/tree_entry.go
index 4ad1ddbff..954f4777f 100644
--- a/internal/service/commit/tree_entry.go
+++ b/internal/service/commit/tree_entry.go
@@ -12,7 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
- pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
+ "gitlab.com/gitlab-org/gitaly/streamio"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -132,7 +132,7 @@ func (s *server) TreeEntry(in *pb.TreeEntryRequest, stream pb.Commit_TreeEntrySe
return helper.DecorateError(codes.Unavailable, stream.Send(response))
}
- sw := pbhelper.NewSendWriter(func(p []byte) error {
+ sw := streamio.NewWriter(func(p []byte) error {
response.Data = p
if err := stream.Send(response); err != nil {
diff --git a/internal/service/smarthttp/inforefs.go b/internal/service/smarthttp/inforefs.go
index 8eae55974..7dc10740f 100644
--- a/internal/service/smarthttp/inforefs.go
+++ b/internal/service/smarthttp/inforefs.go
@@ -6,22 +6,22 @@ import (
log "github.com/Sirupsen/logrus"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
- pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/streamio"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func (s *server) InfoRefsUploadPack(in *pb.InfoRefsRequest, stream pb.SmartHTTP_InfoRefsUploadPackServer) error {
- w := pbhelper.NewSendWriter(func(p []byte) error {
+ w := streamio.NewWriter(func(p []byte) error {
return stream.Send(&pb.InfoRefsResponse{Data: p})
})
return handleInfoRefs("upload-pack", in.Repository, w)
}
func (s *server) InfoRefsReceivePack(in *pb.InfoRefsRequest, stream pb.SmartHTTP_InfoRefsReceivePackServer) error {
- w := pbhelper.NewSendWriter(func(p []byte) error {
+ w := streamio.NewWriter(func(p []byte) error {
return stream.Send(&pb.InfoRefsResponse{Data: p})
})
return handleInfoRefs("receive-pack", in.Repository, w)
diff --git a/internal/service/smarthttp/receive_pack.go b/internal/service/smarthttp/receive_pack.go
index 9251d8b88..28fff96dd 100644
--- a/internal/service/smarthttp/receive_pack.go
+++ b/internal/service/smarthttp/receive_pack.go
@@ -8,7 +8,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
- pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
+ "gitlab.com/gitlab-org/gitaly/streamio"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
@@ -22,11 +22,11 @@ func (s *server) PostReceivePack(stream pb.SmartHTTP_PostReceivePackServer) erro
return err
}
- stdin := pbhelper.NewReceiveReader(func() ([]byte, error) {
+ stdin := streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetData(), err
})
- stdout := pbhelper.NewSendWriter(func(p []byte) error {
+ stdout := streamio.NewWriter(func(p []byte) error {
return stream.Send(&pb.PostReceivePackResponse{Data: p})
})
env := []string{
diff --git a/internal/service/smarthttp/receive_pack_test.go b/internal/service/smarthttp/receive_pack_test.go
index 3f1cc32e7..74573e2de 100644
--- a/internal/service/smarthttp/receive_pack_test.go
+++ b/internal/service/smarthttp/receive_pack_test.go
@@ -12,7 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
- pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
+ "gitlab.com/gitlab-org/gitaly/streamio"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
@@ -78,7 +78,7 @@ func TestSuccessfulReceivePackRequest(t *testing.T) {
require.NoError(t, stream.Send(rpcRequest))
- sw := pbhelper.NewSendWriter(func(p []byte) error {
+ sw := streamio.NewWriter(func(p []byte) error {
return stream.Send(&pb.PostReceivePackRequest{Data: p})
})
_, err = io.Copy(sw, requestBuffer)
@@ -88,7 +88,7 @@ func TestSuccessfulReceivePackRequest(t *testing.T) {
// Verify everything is going as planned
responseBuffer := bytes.Buffer{}
- rr := pbhelper.NewReceiveReader(func() ([]byte, error) {
+ rr := streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetData(), err
})
diff --git a/internal/service/smarthttp/upload_pack.go b/internal/service/smarthttp/upload_pack.go
index fad83d14d..5510d864b 100644
--- a/internal/service/smarthttp/upload_pack.go
+++ b/internal/service/smarthttp/upload_pack.go
@@ -8,7 +8,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
- pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
+ "gitlab.com/gitlab-org/gitaly/streamio"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
@@ -37,7 +37,7 @@ func (s *server) PostUploadPack(stream pb.SmartHTTP_PostUploadPackServer) error
return err
}
- stdinReader := pbhelper.NewReceiveReader(func() ([]byte, error) {
+ stdinReader := streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetData(), err
})
@@ -49,7 +49,7 @@ func (s *server) PostUploadPack(stream pb.SmartHTTP_PostUploadPackServer) error
deepenCh <- scanDeepen(pr)
}()
- stdout := pbhelper.NewSendWriter(func(p []byte) error {
+ stdout := streamio.NewWriter(func(p []byte) error {
return stream.Send(&pb.PostUploadPackResponse{Data: p})
})
repoPath, err := helper.GetRepoPath(req.Repository)
diff --git a/internal/service/smarthttp/upload_pack_test.go b/internal/service/smarthttp/upload_pack_test.go
index 7f8665ac0..a2f777cd4 100644
--- a/internal/service/smarthttp/upload_pack_test.go
+++ b/internal/service/smarthttp/upload_pack_test.go
@@ -15,7 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
- pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
+ "gitlab.com/gitlab-org/gitaly/streamio"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -74,7 +74,7 @@ func TestSuccessfulUploadPackRequest(t *testing.T) {
require.NoError(t, stream.Send(rpcRequest))
- sw := pbhelper.NewSendWriter(func(p []byte) error {
+ sw := streamio.NewWriter(func(p []byte) error {
return stream.Send(&pb.PostUploadPackRequest{Data: p})
})
_, err = io.Copy(sw, requestBuffer)
@@ -82,7 +82,7 @@ func TestSuccessfulUploadPackRequest(t *testing.T) {
stream.CloseSend()
responseBuffer := &bytes.Buffer{}
- rr := pbhelper.NewReceiveReader(func() ([]byte, error) {
+ rr := streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetData(), err
})
@@ -118,7 +118,7 @@ func TestSuccessfulUploadPackDeepenRequest(t *testing.T) {
require.NoError(t, stream.Send(&pb.PostUploadPackRequest{Data: []byte(requestBody)}))
stream.CloseSend()
- rr := pbhelper.NewReceiveReader(func() ([]byte, error) {
+ rr := streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetData(), err
})
diff --git a/internal/service/ssh/receive_pack.go b/internal/service/ssh/receive_pack.go
index 5eadbc3cc..59ff65f5e 100644
--- a/internal/service/ssh/receive_pack.go
+++ b/internal/service/ssh/receive_pack.go
@@ -8,7 +8,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
- pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
+ "gitlab.com/gitlab-org/gitaly/streamio"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
@@ -22,14 +22,14 @@ func (s *server) SSHReceivePack(stream pb.SSH_SSHReceivePackServer) error {
return err
}
- stdin := pbhelper.NewReceiveReader(func() ([]byte, error) {
+ stdin := streamio.NewReader(func() ([]byte, error) {
request, err := stream.Recv()
return request.GetStdin(), err
})
- stdout := pbhelper.NewSendWriter(func(p []byte) error {
+ stdout := streamio.NewWriter(func(p []byte) error {
return stream.Send(&pb.SSHReceivePackResponse{Stdout: p})
})
- stderr := pbhelper.NewSendWriter(func(p []byte) error {
+ stderr := streamio.NewWriter(func(p []byte) error {
return stream.Send(&pb.SSHReceivePackResponse{Stderr: p})
})
env := []string{
diff --git a/internal/service/ssh/uploadpack.go b/internal/service/ssh/uploadpack.go
index aecf251dd..9bf3b3455 100644
--- a/internal/service/ssh/uploadpack.go
+++ b/internal/service/ssh/uploadpack.go
@@ -5,8 +5,8 @@ import (
log "github.com/Sirupsen/logrus"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
- pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/streamio"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
@@ -20,14 +20,14 @@ func (s *server) SSHUploadPack(stream pb.SSH_SSHUploadPackServer) error {
return err
}
- stdin := pbhelper.NewReceiveReader(func() ([]byte, error) {
+ stdin := streamio.NewReader(func() ([]byte, error) {
request, err := stream.Recv()
return request.GetStdin(), err
})
- stdout := pbhelper.NewSendWriter(func(p []byte) error {
+ stdout := streamio.NewWriter(func(p []byte) error {
return stream.Send(&pb.SSHUploadPackResponse{Stdout: p})
})
- stderr := pbhelper.NewSendWriter(func(p []byte) error {
+ stderr := streamio.NewWriter(func(p []byte) error {
return stream.Send(&pb.SSHUploadPackResponse{Stderr: p})
})
repoPath, err := helper.GetRepoPath(req.Repository)
diff --git a/streamio/stream.go b/streamio/stream.go
new file mode 100644
index 000000000..dafade8fb
--- /dev/null
+++ b/streamio/stream.go
@@ -0,0 +1,44 @@
+package streamio
+
+import (
+ "io"
+)
+
+// NewReader turns receiver into an io.Reader. Errors from the receiver
+// function are passed on unmodified. This means receiver should emit
+// io.EOF when done.
+func NewReader(receiver func() ([]byte, error)) io.Reader {
+ return &receiveReader{receiver: receiver}
+}
+
+type receiveReader struct {
+ receiver func() ([]byte, error)
+ data []byte
+ err error
+}
+
+func (rr *receiveReader) Read(p []byte) (int, error) {
+ if len(rr.data) == 0 {
+ rr.data, rr.err = rr.receiver()
+ }
+ n := copy(p, rr.data)
+ rr.data = rr.data[n:]
+ if len(rr.data) == 0 {
+ return n, rr.err
+ }
+ return n, nil
+}
+
+// NewWriter turns sender into an io.Writer. The number of 'bytes
+// written' reported back is always len(p).
+func NewWriter(sender func(p []byte) error) io.Writer {
+ return &sendWriter{sender: sender}
+}
+
+type sendWriter struct {
+ sender func([]byte) error
+}
+
+func (sw *sendWriter) Write(p []byte) (int, error) {
+ return len(p), sw.sender(p)
+}
diff --git a/streamio/stream_test.go b/streamio/stream_test.go
new file mode 100644
index 000000000..456d17e80
--- /dev/null
+++ b/streamio/stream_test.go
@@ -0,0 +1,63 @@
+package streamio
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "strings"
+ "testing"
+ "testing/iotest"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestReceiveSources(t *testing.T) {
+ testData := "Hello this is the test data that will be received"
+ testCases := []struct {
+ desc string
+ r io.Reader
+ }{
+ {desc: "base", r: strings.NewReader(testData)},
+ {desc: "dataerr", r: iotest.DataErrReader(strings.NewReader(testData))},
+ {desc: "onebyte", r: iotest.OneByteReader(strings.NewReader(testData))},
+ {desc: "dataerr(onebyte)", r: iotest.DataErrReader(iotest.OneByteReader(strings.NewReader(testData)))},
+ }
+
+ for _, tc := range testCases {
+ data, err := ioutil.ReadAll(NewReader(receiverFromReader(tc.r)))
+ require.NoError(t, err, tc.desc)
+ require.Equal(t, testData, string(data), tc.desc)
+ }
+}
+
+func TestReadSizes(t *testing.T) {
+ testData := "Hello this is the test data that will be received. It goes on for a while bla bla bla."
+ for n := 1; n < 100; n *= 3 {
+ desc := fmt.Sprintf("reads of size %d", n)
+ buffer := make([]byte, n)
+ result := &bytes.Buffer{}
+ reader := &opaqueReader{NewReader(receiverFromReader(strings.NewReader(testData)))}
+ _, err := io.CopyBuffer(&opaqueWriter{result}, reader, buffer)
+ require.NoError(t, err, desc)
+ require.Equal(t, testData, result.String())
+ }
+}
+
+func receiverFromReader(r io.Reader) func() ([]byte, error) {
+ return func() ([]byte, error) {
+ data := make([]byte, 10)
+ n, err := r.Read(data)
+ return data[:n], err
+ }
+}
+
+// Hide io.WriteTo if it exists
+type opaqueReader struct {
+ io.Reader
+}
+
+// Hide io.ReadFrom if it exists
+type opaqueWriter struct {
+ io.Writer
+}