diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2017-07-13 13:20:13 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2017-07-13 13:20:53 +0300 |
commit | 96a75fa5965d92848ac7f7c8c9717f37112caa32 (patch) | |
tree | 052bef9f294fc7aef32dd5a6aa3b9c9e883d0e70 /streamio | |
parent | 7adbb3678dbfc72dd712405f1b7e3436d29b0052 (diff) |
Add environment variable for streamio buffer size
Diffstat (limited to 'streamio')
-rw-r--r-- | streamio/stream.go | 27 | ||||
-rw-r--r-- | streamio/stream_test.go | 16 |
2 files changed, 30 insertions, 13 deletions
diff --git a/streamio/stream.go b/streamio/stream.go index c1c71db66..379556afe 100644 --- a/streamio/stream.go +++ b/streamio/stream.go @@ -1,9 +1,22 @@ +// Package streamio contains wrappers intended for turning gRPC streams +// that send/receive messages with a []byte field into io.Writers and +// io.Readers. +// package streamio import ( "io" + "os" + "strconv" ) +func init() { + bufSize64, err := strconv.ParseInt(os.Getenv("GITALY_STREAMIO_WRITE_BUFFER_SIZE"), 10, 32) + if err == nil && bufSize64 > 0 { + WriteBufferSize = int(bufSize64) + } +} + // NewReader turns receiver into an io.Reader. Errors from the receiver // function are passed on unmodified. This means receiver should emit // io.EOF when done. @@ -64,12 +77,16 @@ func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) { return written, errWrite } -// NewWriter turns sender into an io.Writer. +// NewWriter turns sender into an io.Writer. The sender callback will +// receive []byte arguments of length at most WriteBufferSize. func NewWriter(sender func(p []byte) error) io.Writer { return &sendWriter{sender: sender} } -var writeBufferSize = 128 * 1024 +// WriteBufferSize is the largest []byte that Write() will pass to its +// underlying send function. This value can be changed at runtime using +// the GITALY_STREAMIO_WRITE_BUFFER_SIZE environment variable. +var WriteBufferSize = 128 * 1024 type sendWriter struct { sender func([]byte) error @@ -80,8 +97,8 @@ func (sw *sendWriter) Write(p []byte) (int, error) { for len(p) > 0 { chunkSize := len(p) - if chunkSize > writeBufferSize { - chunkSize = writeBufferSize + if chunkSize > WriteBufferSize { + chunkSize = WriteBufferSize } if err := sw.sender(p[:chunkSize]); err != nil { @@ -98,7 +115,7 @@ func (sw *sendWriter) Write(p []byte) (int, error) { // ReadFrom implements io.ReaderFrom. func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) { var nRead int64 - buf := make([]byte, writeBufferSize) + buf := make([]byte, WriteBufferSize) var errRead, errSend error for errSend == nil && errRead != io.EOF { diff --git a/streamio/stream_test.go b/streamio/stream_test.go index d263a68ab..e4796913d 100644 --- a/streamio/stream_test.go +++ b/streamio/stream_test.go @@ -87,9 +87,9 @@ type opaqueWriter struct { func TestWriterChunking(t *testing.T) { defer func(oldBufferSize int) { - writeBufferSize = oldBufferSize - }(writeBufferSize) - writeBufferSize = 5 + WriteBufferSize = oldBufferSize + }(WriteBufferSize) + WriteBufferSize = 5 testData := "Hello this is some test data" ts := &testSender{} @@ -99,7 +99,7 @@ func TestWriterChunking(t *testing.T) { require.NoError(t, err) require.Equal(t, testData, string(bytes.Join(ts.sends, nil))) for _, send := range ts.sends { - require.True(t, len(send) <= writeBufferSize, "send calls may not exceed writeBufferSize") + require.True(t, len(send) <= WriteBufferSize, "send calls may not exceed WriteBufferSize") } } @@ -116,9 +116,9 @@ func (ts *testSender) send(p []byte) error { func TestReadFrom(t *testing.T) { defer func(oldBufferSize int) { - writeBufferSize = oldBufferSize - }(writeBufferSize) - writeBufferSize = 5 + WriteBufferSize = oldBufferSize + }(WriteBufferSize) + WriteBufferSize = 5 testData := "Hello this is the test data that will be received. It goes on for a while bla bla bla." testCases := []struct { @@ -139,7 +139,7 @@ func TestReadFrom(t *testing.T) { require.Equal(t, int64(len(testData)), n, tc.desc) require.Equal(t, testData, string(bytes.Join(ts.sends, nil)), tc.desc) for _, send := range ts.sends { - require.True(t, len(send) <= writeBufferSize, "send calls may not exceed writeBufferSize") + require.True(t, len(send) <= WriteBufferSize, "send calls may not exceed WriteBufferSize") } } } |