Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2017-07-13 13:20:13 +0300
committerJacob Vosmaer <jacob@gitlab.com>2017-07-13 13:20:53 +0300
commit96a75fa5965d92848ac7f7c8c9717f37112caa32 (patch)
tree052bef9f294fc7aef32dd5a6aa3b9c9e883d0e70 /streamio
parent7adbb3678dbfc72dd712405f1b7e3436d29b0052 (diff)
Add environment variable for streamio buffer size
Diffstat (limited to 'streamio')
-rw-r--r--streamio/stream.go27
-rw-r--r--streamio/stream_test.go16
2 files changed, 30 insertions, 13 deletions
diff --git a/streamio/stream.go b/streamio/stream.go
index c1c71db66..379556afe 100644
--- a/streamio/stream.go
+++ b/streamio/stream.go
@@ -1,9 +1,22 @@
+// Package streamio contains wrappers intended for turning gRPC streams
+// that send/receive messages with a []byte field into io.Writers and
+// io.Readers.
+//
package streamio
import (
"io"
+ "os"
+ "strconv"
)
+func init() {
+ bufSize64, err := strconv.ParseInt(os.Getenv("GITALY_STREAMIO_WRITE_BUFFER_SIZE"), 10, 32)
+ if err == nil && bufSize64 > 0 {
+ WriteBufferSize = int(bufSize64)
+ }
+}
+
// NewReader turns receiver into an io.Reader. Errors from the receiver
// function are passed on unmodified. This means receiver should emit
// io.EOF when done.
@@ -64,12 +77,16 @@ func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) {
return written, errWrite
}
-// NewWriter turns sender into an io.Writer.
+// NewWriter turns sender into an io.Writer. The sender callback will
+// receive []byte arguments of length at most WriteBufferSize.
func NewWriter(sender func(p []byte) error) io.Writer {
return &sendWriter{sender: sender}
}
-var writeBufferSize = 128 * 1024
+// WriteBufferSize is the largest []byte that Write() will pass to its
+// underlying send function. This value can be changed at runtime using
+// the GITALY_STREAMIO_WRITE_BUFFER_SIZE environment variable.
+var WriteBufferSize = 128 * 1024
type sendWriter struct {
sender func([]byte) error
@@ -80,8 +97,8 @@ func (sw *sendWriter) Write(p []byte) (int, error) {
for len(p) > 0 {
chunkSize := len(p)
- if chunkSize > writeBufferSize {
- chunkSize = writeBufferSize
+ if chunkSize > WriteBufferSize {
+ chunkSize = WriteBufferSize
}
if err := sw.sender(p[:chunkSize]); err != nil {
@@ -98,7 +115,7 @@ func (sw *sendWriter) Write(p []byte) (int, error) {
// ReadFrom implements io.ReaderFrom.
func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) {
var nRead int64
- buf := make([]byte, writeBufferSize)
+ buf := make([]byte, WriteBufferSize)
var errRead, errSend error
for errSend == nil && errRead != io.EOF {
diff --git a/streamio/stream_test.go b/streamio/stream_test.go
index d263a68ab..e4796913d 100644
--- a/streamio/stream_test.go
+++ b/streamio/stream_test.go
@@ -87,9 +87,9 @@ type opaqueWriter struct {
func TestWriterChunking(t *testing.T) {
defer func(oldBufferSize int) {
- writeBufferSize = oldBufferSize
- }(writeBufferSize)
- writeBufferSize = 5
+ WriteBufferSize = oldBufferSize
+ }(WriteBufferSize)
+ WriteBufferSize = 5
testData := "Hello this is some test data"
ts := &testSender{}
@@ -99,7 +99,7 @@ func TestWriterChunking(t *testing.T) {
require.NoError(t, err)
require.Equal(t, testData, string(bytes.Join(ts.sends, nil)))
for _, send := range ts.sends {
- require.True(t, len(send) <= writeBufferSize, "send calls may not exceed writeBufferSize")
+ require.True(t, len(send) <= WriteBufferSize, "send calls may not exceed WriteBufferSize")
}
}
@@ -116,9 +116,9 @@ func (ts *testSender) send(p []byte) error {
func TestReadFrom(t *testing.T) {
defer func(oldBufferSize int) {
- writeBufferSize = oldBufferSize
- }(writeBufferSize)
- writeBufferSize = 5
+ WriteBufferSize = oldBufferSize
+ }(WriteBufferSize)
+ WriteBufferSize = 5
testData := "Hello this is the test data that will be received. It goes on for a while bla bla bla."
testCases := []struct {
@@ -139,7 +139,7 @@ func TestReadFrom(t *testing.T) {
require.Equal(t, int64(len(testData)), n, tc.desc)
require.Equal(t, testData, string(bytes.Join(ts.sends, nil)), tc.desc)
for _, send := range ts.sends {
- require.True(t, len(send) <= writeBufferSize, "send calls may not exceed writeBufferSize")
+ require.True(t, len(send) <= WriteBufferSize, "send calls may not exceed WriteBufferSize")
}
}
}