1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
package streamio
import (
"bytes"
"fmt"
"io"
"strings"
"sync"
"testing"
"testing/iotest"
"github.com/stretchr/testify/require"
)
func TestReceiveSources(t *testing.T) {
testData := "Hello this is the test data that will be received"
testCases := []struct {
desc string
r io.Reader
}{
{desc: "base", r: strings.NewReader(testData)},
{desc: "dataerr", r: iotest.DataErrReader(strings.NewReader(testData))},
{desc: "onebyte", r: iotest.OneByteReader(strings.NewReader(testData))},
{desc: "dataerr(onebyte)", r: iotest.DataErrReader(iotest.OneByteReader(strings.NewReader(testData)))},
}
for _, tc := range testCases {
data, err := io.ReadAll(&opaqueReader{NewReader(receiverFromReader(tc.r))})
require.NoError(t, err, tc.desc)
require.Equal(t, testData, string(data), tc.desc)
}
}
func TestReadSizes(t *testing.T) {
readSizes := func(t *testing.T, newReader func(string) io.Reader) {
testData := "Hello this is the test data that will be received. It goes on for a while bla bla bla."
for n := 1; n < 100; n *= 3 {
desc := fmt.Sprintf("reads of size %d", n)
result := &bytes.Buffer{}
reader := &opaqueReader{NewReader(receiverFromReader(newReader(testData)))}
n, err := io.CopyBuffer(&opaqueWriter{result}, reader, make([]byte, n))
require.NoError(t, err, desc)
require.Equal(t, testData, result.String())
require.EqualValues(t, len(testData), n)
}
}
t.Run("normal reader", func(t *testing.T) {
readSizes(t, func(s string) io.Reader {
return strings.NewReader(s)
})
})
t.Run("err reader", func(t *testing.T) {
readSizes(t, func(s string) io.Reader {
return iotest.DataErrReader(strings.NewReader(s))
})
})
}
func receiverFromReader(r io.Reader) func() ([]byte, error) {
return func() ([]byte, error) {
data := make([]byte, 10)
n, err := r.Read(data)
return data[:n], err
}
}
// Hide io.WriteTo if it exists
type opaqueReader struct {
io.Reader
}
// Hide io.ReadFrom if it exists
type opaqueWriter struct {
io.Writer
}
func TestWriterChunking(t *testing.T) {
defer func(oldBufferSize int) {
WriteBufferSize = oldBufferSize
}(WriteBufferSize)
WriteBufferSize = 5
testData := "Hello this is some test data"
ts := &testSender{}
w := NewWriter(ts.send)
_, err := io.CopyBuffer(&opaqueWriter{w}, strings.NewReader(testData), make([]byte, 10))
require.NoError(t, err)
require.Equal(t, testData, string(bytes.Join(ts.sends, nil)))
for _, send := range ts.sends {
require.True(t, len(send) <= WriteBufferSize, "send calls may not exceed WriteBufferSize")
}
}
func TestNewSyncWriter(t *testing.T) {
var m sync.Mutex
testData := "Hello this is some test data"
ts := &testSender{}
w := NewSyncWriter(&m, func(p []byte) error {
// As there is no way to check whether a mutex is locked already, we can just try to
// unlock it here. If the mutex wasn't locked, it would cause a runtime error. As
// there's no concurrent writers in this test, this is safe to do.
m.Unlock()
m.Lock()
return ts.send(p)
})
_, err := io.CopyBuffer(&opaqueWriter{w}, strings.NewReader(testData), make([]byte, 10))
require.NoError(t, err)
require.Equal(t, testData, string(bytes.Join(ts.sends, nil)))
}
type testSender struct {
sends [][]byte
}
func (ts *testSender) send(p []byte) error {
buf := make([]byte, len(p))
copy(buf, p)
ts.sends = append(ts.sends, buf)
return nil
}
|