Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2017-06-22 17:42:43 +0300
committerJacob Vosmaer <jacob@gitlab.com>2017-06-26 12:08:05 +0300
commit28a2fcaf9d643de52371f0051bc2734d7236152f (patch)
tree8c8e90b0781a8153231ecdaf8e4d1409b5fc9b97 /streamio
parentb933e5ce4843ec6c332a0184afb8e69820cc9050 (diff)
Use stream helpers from top-level package
Diffstat (limited to 'streamio')
-rw-r--r--streamio/stream.go44
-rw-r--r--streamio/stream_test.go63
2 files changed, 107 insertions, 0 deletions
diff --git a/streamio/stream.go b/streamio/stream.go
new file mode 100644
index 000000000..dafade8fb
--- /dev/null
+++ b/streamio/stream.go
@@ -0,0 +1,44 @@
+package streamio
+
+import (
+ "io"
+)
+
+// NewReader turns receiver into an io.Reader. Errors from the receiver
+// function are passed on unmodified. This means receiver should emit
+// io.EOF when done.
+func NewReader(receiver func() ([]byte, error)) io.Reader {
+ return &receiveReader{receiver: receiver}
+}
+
+type receiveReader struct {
+ receiver func() ([]byte, error)
+ data []byte
+ err error
+}
+
+func (rr *receiveReader) Read(p []byte) (int, error) {
+ if len(rr.data) == 0 {
+ rr.data, rr.err = rr.receiver()
+ }
+ n := copy(p, rr.data)
+ rr.data = rr.data[n:]
+ if len(rr.data) == 0 {
+ return n, rr.err
+ }
+ return n, nil
+}
+
+// NewWriter turns sender into an io.Writer. The number of 'bytes
+// written' reported back is always len(p).
+func NewWriter(sender func(p []byte) error) io.Writer {
+ return &sendWriter{sender: sender}
+}
+
+type sendWriter struct {
+ sender func([]byte) error
+}
+
+func (sw *sendWriter) Write(p []byte) (int, error) {
+ return len(p), sw.sender(p)
+}
diff --git a/streamio/stream_test.go b/streamio/stream_test.go
new file mode 100644
index 000000000..456d17e80
--- /dev/null
+++ b/streamio/stream_test.go
@@ -0,0 +1,63 @@
+package streamio
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "strings"
+ "testing"
+ "testing/iotest"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestReceiveSources(t *testing.T) {
+ testData := "Hello this is the test data that will be received"
+ testCases := []struct {
+ desc string
+ r io.Reader
+ }{
+ {desc: "base", r: strings.NewReader(testData)},
+ {desc: "dataerr", r: iotest.DataErrReader(strings.NewReader(testData))},
+ {desc: "onebyte", r: iotest.OneByteReader(strings.NewReader(testData))},
+ {desc: "dataerr(onebyte)", r: iotest.DataErrReader(iotest.OneByteReader(strings.NewReader(testData)))},
+ }
+
+ for _, tc := range testCases {
+ data, err := ioutil.ReadAll(NewReader(receiverFromReader(tc.r)))
+ require.NoError(t, err, tc.desc)
+ require.Equal(t, testData, string(data), tc.desc)
+ }
+}
+
+func TestReadSizes(t *testing.T) {
+ testData := "Hello this is the test data that will be received. It goes on for a while bla bla bla."
+ for n := 1; n < 100; n *= 3 {
+ desc := fmt.Sprintf("reads of size %d", n)
+ buffer := make([]byte, n)
+ result := &bytes.Buffer{}
+ reader := &opaqueReader{NewReader(receiverFromReader(strings.NewReader(testData)))}
+ _, err := io.CopyBuffer(&opaqueWriter{result}, reader, buffer)
+ require.NoError(t, err, desc)
+ require.Equal(t, testData, result.String())
+ }
+}
+
+func receiverFromReader(r io.Reader) func() ([]byte, error) {
+ return func() ([]byte, error) {
+ data := make([]byte, 10)
+ n, err := r.Read(data)
+ return data[:n], err
+ }
+}
+
+// Hide io.WriteTo if it exists
+type opaqueReader struct {
+ io.Reader
+}
+
+// Hide io.ReadFrom if it exists
+type opaqueWriter struct {
+ io.Writer
+}