diff options
Diffstat (limited to 'streamio')
-rw-r--r-- | streamio/stream.go | 99 |
1 files changed, 11 insertions, 88 deletions
diff --git a/streamio/stream.go b/streamio/stream.go index d3074d9f8..347f20731 100644 --- a/streamio/stream.go +++ b/streamio/stream.go @@ -5,33 +5,10 @@ package streamio import ( - "errors" "io" - "os" - "strconv" "sync" - - "github.com/prometheus/client_golang/prometheus" -) - -var ( - methodCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gitaly_streamio_method_calls_total", - Help: "Usage counters of gitaly streamio methods", - }, []string{"method"}, - ) ) -func init() { - prometheus.MustRegister(methodCount) - - bufSize64, err := strconv.ParseInt(os.Getenv("GITALY_STREAMIO_WRITE_BUFFER_SIZE"), 10, 32) - if err == nil && bufSize64 > 0 { - WriteBufferSize = int(bufSize64) - } -} - // NewReader turns receiver into an io.Reader. Errors from the receiver // function are passed on unmodified. This means receiver should emit // io.EOF when done. @@ -45,11 +22,7 @@ type receiveReader struct { err error } -func countMethod(method string) { methodCount.WithLabelValues(method).Inc() } - func (rr *receiveReader) Read(p []byte) (int, error) { - countMethod("reader.Read") - if len(rr.data) == 0 { rr.data, rr.err = rr.receiver() } @@ -67,42 +40,13 @@ func (rr *receiveReader) Read(p []byte) (int, error) { return n, nil } -// WriteTo implements io.WriterTo. -func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) { - countMethod("reader.WriteTo") - - var written int64 - - // Deal with left-over state in rr.data and rr.err, if any - if len(rr.data) > 0 { - n, err := w.Write(rr.data) - written += int64(n) - if err != nil { - return written, err - } - } - if rr.err != nil { - return written, rr.err - } - - // Consume the response stream - var errRead, errWrite error - var n int - var buf []byte - for errWrite == nil && errRead != io.EOF { - buf, errRead = rr.receiver() - if errRead != nil && errRead != io.EOF { - return written, errRead - } +// wrapReader hides WriteTo to prevent inifinite recursion +type wrapReader struct{ io.Reader } - if len(buf) > 0 { - n, errWrite = w.Write(buf) - written += int64(n) - } - } +func (wr *wrapReader) Read(p []byte) (int, error) { return wr.Reader.Read(p) } - return written, errWrite -} +// Deprecated: will be removed in v14. Use io.Copy instead. +func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) { return io.Copy(w, &wrapReader{rr}) } // NewWriter turns sender into an io.Writer. The sender callback will // receive []byte arguments of length at most WriteBufferSize. @@ -125,8 +69,7 @@ func NewSyncWriter(m *sync.Mutex, sender func(p []byte) error) io.Writer { } // WriteBufferSize is the largest []byte that Write() will pass to its -// underlying send function. This value can be changed at runtime using -// the GITALY_STREAMIO_WRITE_BUFFER_SIZE environment variable. +// underlying send function. var WriteBufferSize = 128 * 1024 type sendWriter struct { @@ -134,8 +77,6 @@ type sendWriter struct { } func (sw *sendWriter) Write(p []byte) (int, error) { - countMethod("writer.Write") - var sent int for len(p) > 0 { @@ -155,28 +96,10 @@ func (sw *sendWriter) Write(p []byte) (int, error) { return sent, nil } -// ReadFrom implements io.ReaderFrom. -func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) { - countMethod("writer.ReadFrom") +// wrapWriter hides ReadFrom to prevent inifinite recursion +type wrapWriter struct{ io.Writer } - var nRead int64 - buf := make([]byte, WriteBufferSize) +func (ww *wrapWriter) Write(p []byte) (int, error) { return ww.Writer.Write(p) } - for { - n, err := r.Read(buf) - nRead += int64(n) - - if n > 0 { - if err := sw.sender(buf[:n]); err != nil { - return nRead, err - } - } - - if err != nil { - if errors.Is(err, io.EOF) { - return nRead, nil - } - return nRead, err - } - } -} +// Deprecated: will be removed in v14. Use io.Copy instead. +func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) { return io.Copy(&wrapWriter{sw}, r) } |