From 28a2fcaf9d643de52371f0051bc2734d7236152f Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Thu, 22 Jun 2017 16:42:43 +0200 Subject: Use stream helpers from top-level package --- streamio/stream.go | 44 ++++++++++++++++++++++++++++++++++ streamio/stream_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 streamio/stream.go create mode 100644 streamio/stream_test.go (limited to 'streamio') diff --git a/streamio/stream.go b/streamio/stream.go new file mode 100644 index 000000000..dafade8fb --- /dev/null +++ b/streamio/stream.go @@ -0,0 +1,44 @@ +package streamio + +import ( + "io" +) + +// NewReader turns receiver into an io.Reader. Errors from the receiver +// function are passed on unmodified. This means receiver should emit +// io.EOF when done. +func NewReader(receiver func() ([]byte, error)) io.Reader { + return &receiveReader{receiver: receiver} +} + +type receiveReader struct { + receiver func() ([]byte, error) + data []byte + err error +} + +func (rr *receiveReader) Read(p []byte) (int, error) { + if len(rr.data) == 0 { + rr.data, rr.err = rr.receiver() + } + n := copy(p, rr.data) + rr.data = rr.data[n:] + if len(rr.data) == 0 { + return n, rr.err + } + return n, nil +} + +// NewWriter turns sender into an io.Writer. The number of 'bytes +// written' reported back is always len(p). +func NewWriter(sender func(p []byte) error) io.Writer { + return &sendWriter{sender: sender} +} + +type sendWriter struct { + sender func([]byte) error +} + +func (sw *sendWriter) Write(p []byte) (int, error) { + return len(p), sw.sender(p) +} diff --git a/streamio/stream_test.go b/streamio/stream_test.go new file mode 100644 index 000000000..456d17e80 --- /dev/null +++ b/streamio/stream_test.go @@ -0,0 +1,63 @@ +package streamio + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "strings" + "testing" + "testing/iotest" + + "github.com/stretchr/testify/require" +) + +func TestReceiveSources(t *testing.T) { + testData := "Hello this is the test data that will be received" + testCases := []struct { + desc string + r io.Reader + }{ + {desc: "base", r: strings.NewReader(testData)}, + {desc: "dataerr", r: iotest.DataErrReader(strings.NewReader(testData))}, + {desc: "onebyte", r: iotest.OneByteReader(strings.NewReader(testData))}, + {desc: "dataerr(onebyte)", r: iotest.DataErrReader(iotest.OneByteReader(strings.NewReader(testData)))}, + } + + for _, tc := range testCases { + data, err := ioutil.ReadAll(NewReader(receiverFromReader(tc.r))) + require.NoError(t, err, tc.desc) + require.Equal(t, testData, string(data), tc.desc) + } +} + +func TestReadSizes(t *testing.T) { + testData := "Hello this is the test data that will be received. It goes on for a while bla bla bla." + for n := 1; n < 100; n *= 3 { + desc := fmt.Sprintf("reads of size %d", n) + buffer := make([]byte, n) + result := &bytes.Buffer{} + reader := &opaqueReader{NewReader(receiverFromReader(strings.NewReader(testData)))} + _, err := io.CopyBuffer(&opaqueWriter{result}, reader, buffer) + require.NoError(t, err, desc) + require.Equal(t, testData, result.String()) + } +} + +func receiverFromReader(r io.Reader) func() ([]byte, error) { + return func() ([]byte, error) { + data := make([]byte, 10) + n, err := r.Read(data) + return data[:n], err + } +} + +// Hide io.WriteTo if it exists +type opaqueReader struct { + io.Reader +} + +// Hide io.ReadFrom if it exists +type opaqueWriter struct { + io.Writer +} -- cgit v1.2.3