Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'streamio/stream.go')
-rw-r--r--streamio/stream.go27
1 files changed, 22 insertions, 5 deletions
diff --git a/streamio/stream.go b/streamio/stream.go
index c1c71db66..379556afe 100644
--- a/streamio/stream.go
+++ b/streamio/stream.go
@@ -1,9 +1,22 @@
+// Package streamio contains wrappers intended for turning gRPC streams
+// that send/receive messages with a []byte field into io.Writers and
+// io.Readers.
+//
package streamio
import (
"io"
+ "os"
+ "strconv"
)
+func init() {
+ bufSize64, err := strconv.ParseInt(os.Getenv("GITALY_STREAMIO_WRITE_BUFFER_SIZE"), 10, 32)
+ if err == nil && bufSize64 > 0 {
+ WriteBufferSize = int(bufSize64)
+ }
+}
+
// NewReader turns receiver into an io.Reader. Errors from the receiver
// function are passed on unmodified. This means receiver should emit
// io.EOF when done.
@@ -64,12 +77,16 @@ func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) {
return written, errWrite
}
-// NewWriter turns sender into an io.Writer.
+// NewWriter turns sender into an io.Writer. The sender callback will
+// receive []byte arguments of length at most WriteBufferSize.
func NewWriter(sender func(p []byte) error) io.Writer {
return &sendWriter{sender: sender}
}
-var writeBufferSize = 128 * 1024
+// WriteBufferSize is the largest []byte that Write() will pass to its
+// underlying send function. This value can be changed at runtime using
+// the GITALY_STREAMIO_WRITE_BUFFER_SIZE environment variable.
+var WriteBufferSize = 128 * 1024
type sendWriter struct {
sender func([]byte) error
@@ -80,8 +97,8 @@ func (sw *sendWriter) Write(p []byte) (int, error) {
for len(p) > 0 {
chunkSize := len(p)
- if chunkSize > writeBufferSize {
- chunkSize = writeBufferSize
+ if chunkSize > WriteBufferSize {
+ chunkSize = WriteBufferSize
}
if err := sw.sender(p[:chunkSize]); err != nil {
@@ -98,7 +115,7 @@ func (sw *sendWriter) Write(p []byte) (int, error) {
// ReadFrom implements io.ReaderFrom.
func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) {
var nRead int64
- buf := make([]byte, writeBufferSize)
+ buf := make([]byte, WriteBufferSize)
var errRead, errSend error
for errSend == nil && errRead != io.EOF {