Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2021-02-25 18:46:33 +0300
committerJacob Vosmaer <jacob@gitlab.com>2021-03-01 15:59:52 +0300
commitd558a7fe2876fc482d397452a403856a0e154f6b (patch)
treee340a71c21ccbd00bf00c9ffe19b8caf78483e70 /streamio
parentef061fd0ccb16fadf3d8550c12b27c4cb3159990 (diff)
streamio: remove custom ReadFrom and WriteTo
This replaces sendWriter.ReadFrom and receiveReader.WriteTo with trivial implementations based on io.Copy, to be removed in 14.0. The purpose of the io.ReaderFrom and io.WriterTo interfaces is to optimize away a buffer allocation in io.Copy, and/or being able to do something smart like calling sendfile(2). Without them, io.Copy allocates a 32KB buffer and reads into / writes from that buffer in a loop. The thing is, sendWriter.ReadFrom and receiveReader.WriteTo neither make a difference for allocations, nor do they do something "smart". First of all, sendWriter.ReadFrom was not helping the buffer allocation efficiency because it allocated its own 128KB buffer. In fact, sendWriter.ReadFrom is equivalent to what io.Copy does on the inside, so we might as well let io.Copy do that. That leaves receiveReader.WriteTo as a potentially useful optimization. It does prevent io.Copy from allocating that 32KB buffer. However, everytime it calls receiver(), gRPC allocates a new byte slice anyway, in a loop. Avoiding that one 32KB allocation outside the loop does not seem worth it.
Diffstat (limited to 'streamio')
-rw-r--r--streamio/stream.go99
1 files changed, 11 insertions, 88 deletions
diff --git a/streamio/stream.go b/streamio/stream.go
index d3074d9f8..347f20731 100644
--- a/streamio/stream.go
+++ b/streamio/stream.go
@@ -5,33 +5,10 @@
package streamio
import (
- "errors"
"io"
- "os"
- "strconv"
"sync"
-
- "github.com/prometheus/client_golang/prometheus"
-)
-
-var (
- methodCount = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "gitaly_streamio_method_calls_total",
- Help: "Usage counters of gitaly streamio methods",
- }, []string{"method"},
- )
)
-func init() {
- prometheus.MustRegister(methodCount)
-
- bufSize64, err := strconv.ParseInt(os.Getenv("GITALY_STREAMIO_WRITE_BUFFER_SIZE"), 10, 32)
- if err == nil && bufSize64 > 0 {
- WriteBufferSize = int(bufSize64)
- }
-}
-
// NewReader turns receiver into an io.Reader. Errors from the receiver
// function are passed on unmodified. This means receiver should emit
// io.EOF when done.
@@ -45,11 +22,7 @@ type receiveReader struct {
err error
}
-func countMethod(method string) { methodCount.WithLabelValues(method).Inc() }
-
func (rr *receiveReader) Read(p []byte) (int, error) {
- countMethod("reader.Read")
-
if len(rr.data) == 0 {
rr.data, rr.err = rr.receiver()
}
@@ -67,42 +40,13 @@ func (rr *receiveReader) Read(p []byte) (int, error) {
return n, nil
}
-// WriteTo implements io.WriterTo.
-func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) {
- countMethod("reader.WriteTo")
-
- var written int64
-
- // Deal with left-over state in rr.data and rr.err, if any
- if len(rr.data) > 0 {
- n, err := w.Write(rr.data)
- written += int64(n)
- if err != nil {
- return written, err
- }
- }
- if rr.err != nil {
- return written, rr.err
- }
-
- // Consume the response stream
- var errRead, errWrite error
- var n int
- var buf []byte
- for errWrite == nil && errRead != io.EOF {
- buf, errRead = rr.receiver()
- if errRead != nil && errRead != io.EOF {
- return written, errRead
- }
+// wrapReader hides WriteTo to prevent inifinite recursion
+type wrapReader struct{ io.Reader }
- if len(buf) > 0 {
- n, errWrite = w.Write(buf)
- written += int64(n)
- }
- }
+func (wr *wrapReader) Read(p []byte) (int, error) { return wr.Reader.Read(p) }
- return written, errWrite
-}
+// Deprecated: will be removed in v14. Use io.Copy instead.
+func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) { return io.Copy(w, &wrapReader{rr}) }
// NewWriter turns sender into an io.Writer. The sender callback will
// receive []byte arguments of length at most WriteBufferSize.
@@ -125,8 +69,7 @@ func NewSyncWriter(m *sync.Mutex, sender func(p []byte) error) io.Writer {
}
// WriteBufferSize is the largest []byte that Write() will pass to its
-// underlying send function. This value can be changed at runtime using
-// the GITALY_STREAMIO_WRITE_BUFFER_SIZE environment variable.
+// underlying send function.
var WriteBufferSize = 128 * 1024
type sendWriter struct {
@@ -134,8 +77,6 @@ type sendWriter struct {
}
func (sw *sendWriter) Write(p []byte) (int, error) {
- countMethod("writer.Write")
-
var sent int
for len(p) > 0 {
@@ -155,28 +96,10 @@ func (sw *sendWriter) Write(p []byte) (int, error) {
return sent, nil
}
-// ReadFrom implements io.ReaderFrom.
-func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) {
- countMethod("writer.ReadFrom")
+// wrapWriter hides ReadFrom to prevent inifinite recursion
+type wrapWriter struct{ io.Writer }
- var nRead int64
- buf := make([]byte, WriteBufferSize)
+func (ww *wrapWriter) Write(p []byte) (int, error) { return ww.Writer.Write(p) }
- for {
- n, err := r.Read(buf)
- nRead += int64(n)
-
- if n > 0 {
- if err := sw.sender(buf[:n]); err != nil {
- return nRead, err
- }
- }
-
- if err != nil {
- if errors.Is(err, io.EOF) {
- return nRead, nil
- }
- return nRead, err
- }
- }
-}
+// Deprecated: will be removed in v14. Use io.Copy instead.
+func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) { return io.Copy(&wrapWriter{sw}, r) }