diff options
Diffstat (limited to 'streamio/stream.go')
-rw-r--r-- | streamio/stream.go | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/streamio/stream.go b/streamio/stream.go index c1c71db66..379556afe 100644 --- a/streamio/stream.go +++ b/streamio/stream.go @@ -1,9 +1,22 @@ +// Package streamio contains wrappers intended for turning gRPC streams +// that send/receive messages with a []byte field into io.Writers and +// io.Readers. +// package streamio import ( "io" + "os" + "strconv" ) +func init() { + bufSize64, err := strconv.ParseInt(os.Getenv("GITALY_STREAMIO_WRITE_BUFFER_SIZE"), 10, 32) + if err == nil && bufSize64 > 0 { + WriteBufferSize = int(bufSize64) + } +} + // NewReader turns receiver into an io.Reader. Errors from the receiver // function are passed on unmodified. This means receiver should emit // io.EOF when done. @@ -64,12 +77,16 @@ func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) { return written, errWrite } -// NewWriter turns sender into an io.Writer. +// NewWriter turns sender into an io.Writer. The sender callback will +// receive []byte arguments of length at most WriteBufferSize. func NewWriter(sender func(p []byte) error) io.Writer { return &sendWriter{sender: sender} } -var writeBufferSize = 128 * 1024 +// WriteBufferSize is the largest []byte that Write() will pass to its +// underlying send function. This value can be changed at runtime using +// the GITALY_STREAMIO_WRITE_BUFFER_SIZE environment variable. +var WriteBufferSize = 128 * 1024 type sendWriter struct { sender func([]byte) error @@ -80,8 +97,8 @@ func (sw *sendWriter) Write(p []byte) (int, error) { for len(p) > 0 { chunkSize := len(p) - if chunkSize > writeBufferSize { - chunkSize = writeBufferSize + if chunkSize > WriteBufferSize { + chunkSize = WriteBufferSize } if err := sw.sender(p[:chunkSize]); err != nil { @@ -98,7 +115,7 @@ func (sw *sendWriter) Write(p []byte) (int, error) { // ReadFrom implements io.ReaderFrom. func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) { var nRead int64 - buf := make([]byte, writeBufferSize) + buf := make([]byte, WriteBufferSize) var errRead, errSend error for errSend == nil && errRead != io.EOF { |