diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2021-02-25 18:46:33 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2021-03-01 15:59:52 +0300 |
commit | d558a7fe2876fc482d397452a403856a0e154f6b (patch) | |
tree | e340a71c21ccbd00bf00c9ffe19b8caf78483e70 /streamio | |
parent | ef061fd0ccb16fadf3d8550c12b27c4cb3159990 (diff) |
streamio: remove custom ReadFrom and WriteTo
This replaces sendWriter.ReadFrom and receiveReader.WriteTo with
trivial implementations based on io.Copy, to be removed in 14.0.
The purpose of the io.ReaderFrom and io.WriterTo interfaces is to
optimize away a buffer allocation in io.Copy, and/or being able to do
something smart like calling sendfile(2). Without them, io.Copy
allocates a 32KB buffer and reads into / writes from that buffer in a
loop.
The thing is, sendWriter.ReadFrom and receiveReader.WriteTo neither
make a difference for allocations, nor do they do something "smart".
First of all, sendWriter.ReadFrom was not helping the buffer
allocation efficiency because it allocated its own 128KB buffer. In
fact, sendWriter.ReadFrom is equivalent to what io.Copy does on the
inside, so we might as well let io.Copy do that.
That leaves receiveReader.WriteTo as a potentially useful
optimization. It does prevent io.Copy from allocating that 32KB
buffer. However, everytime it calls receiver(), gRPC allocates a new
byte slice anyway, in a loop. Avoiding that one 32KB allocation
outside the loop does not seem worth it.
Diffstat (limited to 'streamio')
-rw-r--r-- | streamio/stream.go | 99 |
1 files changed, 11 insertions, 88 deletions
diff --git a/streamio/stream.go b/streamio/stream.go index d3074d9f8..347f20731 100644 --- a/streamio/stream.go +++ b/streamio/stream.go @@ -5,33 +5,10 @@ package streamio import ( - "errors" "io" - "os" - "strconv" "sync" - - "github.com/prometheus/client_golang/prometheus" -) - -var ( - methodCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gitaly_streamio_method_calls_total", - Help: "Usage counters of gitaly streamio methods", - }, []string{"method"}, - ) ) -func init() { - prometheus.MustRegister(methodCount) - - bufSize64, err := strconv.ParseInt(os.Getenv("GITALY_STREAMIO_WRITE_BUFFER_SIZE"), 10, 32) - if err == nil && bufSize64 > 0 { - WriteBufferSize = int(bufSize64) - } -} - // NewReader turns receiver into an io.Reader. Errors from the receiver // function are passed on unmodified. This means receiver should emit // io.EOF when done. @@ -45,11 +22,7 @@ type receiveReader struct { err error } -func countMethod(method string) { methodCount.WithLabelValues(method).Inc() } - func (rr *receiveReader) Read(p []byte) (int, error) { - countMethod("reader.Read") - if len(rr.data) == 0 { rr.data, rr.err = rr.receiver() } @@ -67,42 +40,13 @@ func (rr *receiveReader) Read(p []byte) (int, error) { return n, nil } -// WriteTo implements io.WriterTo. -func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) { - countMethod("reader.WriteTo") - - var written int64 - - // Deal with left-over state in rr.data and rr.err, if any - if len(rr.data) > 0 { - n, err := w.Write(rr.data) - written += int64(n) - if err != nil { - return written, err - } - } - if rr.err != nil { - return written, rr.err - } - - // Consume the response stream - var errRead, errWrite error - var n int - var buf []byte - for errWrite == nil && errRead != io.EOF { - buf, errRead = rr.receiver() - if errRead != nil && errRead != io.EOF { - return written, errRead - } +// wrapReader hides WriteTo to prevent inifinite recursion +type wrapReader struct{ io.Reader } - if len(buf) > 0 { - n, errWrite = w.Write(buf) - written += int64(n) - } - } +func (wr *wrapReader) Read(p []byte) (int, error) { return wr.Reader.Read(p) } - return written, errWrite -} +// Deprecated: will be removed in v14. Use io.Copy instead. +func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) { return io.Copy(w, &wrapReader{rr}) } // NewWriter turns sender into an io.Writer. The sender callback will // receive []byte arguments of length at most WriteBufferSize. @@ -125,8 +69,7 @@ func NewSyncWriter(m *sync.Mutex, sender func(p []byte) error) io.Writer { } // WriteBufferSize is the largest []byte that Write() will pass to its -// underlying send function. This value can be changed at runtime using -// the GITALY_STREAMIO_WRITE_BUFFER_SIZE environment variable. +// underlying send function. var WriteBufferSize = 128 * 1024 type sendWriter struct { @@ -134,8 +77,6 @@ type sendWriter struct { } func (sw *sendWriter) Write(p []byte) (int, error) { - countMethod("writer.Write") - var sent int for len(p) > 0 { @@ -155,28 +96,10 @@ func (sw *sendWriter) Write(p []byte) (int, error) { return sent, nil } -// ReadFrom implements io.ReaderFrom. -func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) { - countMethod("writer.ReadFrom") +// wrapWriter hides ReadFrom to prevent inifinite recursion +type wrapWriter struct{ io.Writer } - var nRead int64 - buf := make([]byte, WriteBufferSize) +func (ww *wrapWriter) Write(p []byte) (int, error) { return ww.Writer.Write(p) } - for { - n, err := r.Read(buf) - nRead += int64(n) - - if n > 0 { - if err := sw.sender(buf[:n]); err != nil { - return nRead, err - } - } - - if err != nil { - if errors.Is(err, io.EOF) { - return nRead, nil - } - return nRead, err - } - } -} +// Deprecated: will be removed in v14. Use io.Copy instead. +func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) { return io.Copy(&wrapWriter{sw}, r) } |