Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'streamio')
-rw-r--r--streamio/stream.go99
1 files changed, 11 insertions, 88 deletions
diff --git a/streamio/stream.go b/streamio/stream.go
index d3074d9f8..347f20731 100644
--- a/streamio/stream.go
+++ b/streamio/stream.go
@@ -5,33 +5,10 @@
package streamio
import (
- "errors"
"io"
- "os"
- "strconv"
"sync"
-
- "github.com/prometheus/client_golang/prometheus"
-)
-
-var (
- methodCount = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "gitaly_streamio_method_calls_total",
- Help: "Usage counters of gitaly streamio methods",
- }, []string{"method"},
- )
)
-func init() {
- prometheus.MustRegister(methodCount)
-
- bufSize64, err := strconv.ParseInt(os.Getenv("GITALY_STREAMIO_WRITE_BUFFER_SIZE"), 10, 32)
- if err == nil && bufSize64 > 0 {
- WriteBufferSize = int(bufSize64)
- }
-}
-
// NewReader turns receiver into an io.Reader. Errors from the receiver
// function are passed on unmodified. This means receiver should emit
// io.EOF when done.
@@ -45,11 +22,7 @@ type receiveReader struct {
err error
}
-func countMethod(method string) { methodCount.WithLabelValues(method).Inc() }
-
func (rr *receiveReader) Read(p []byte) (int, error) {
- countMethod("reader.Read")
-
if len(rr.data) == 0 {
rr.data, rr.err = rr.receiver()
}
@@ -67,42 +40,13 @@ func (rr *receiveReader) Read(p []byte) (int, error) {
return n, nil
}
-// WriteTo implements io.WriterTo.
-func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) {
- countMethod("reader.WriteTo")
-
- var written int64
-
- // Deal with left-over state in rr.data and rr.err, if any
- if len(rr.data) > 0 {
- n, err := w.Write(rr.data)
- written += int64(n)
- if err != nil {
- return written, err
- }
- }
- if rr.err != nil {
- return written, rr.err
- }
-
- // Consume the response stream
- var errRead, errWrite error
- var n int
- var buf []byte
- for errWrite == nil && errRead != io.EOF {
- buf, errRead = rr.receiver()
- if errRead != nil && errRead != io.EOF {
- return written, errRead
- }
+// wrapReader hides WriteTo to prevent inifinite recursion
+type wrapReader struct{ io.Reader }
- if len(buf) > 0 {
- n, errWrite = w.Write(buf)
- written += int64(n)
- }
- }
+func (wr *wrapReader) Read(p []byte) (int, error) { return wr.Reader.Read(p) }
- return written, errWrite
-}
+// Deprecated: will be removed in v14. Use io.Copy instead.
+func (rr *receiveReader) WriteTo(w io.Writer) (int64, error) { return io.Copy(w, &wrapReader{rr}) }
// NewWriter turns sender into an io.Writer. The sender callback will
// receive []byte arguments of length at most WriteBufferSize.
@@ -125,8 +69,7 @@ func NewSyncWriter(m *sync.Mutex, sender func(p []byte) error) io.Writer {
}
// WriteBufferSize is the largest []byte that Write() will pass to its
-// underlying send function. This value can be changed at runtime using
-// the GITALY_STREAMIO_WRITE_BUFFER_SIZE environment variable.
+// underlying send function.
var WriteBufferSize = 128 * 1024
type sendWriter struct {
@@ -134,8 +77,6 @@ type sendWriter struct {
}
func (sw *sendWriter) Write(p []byte) (int, error) {
- countMethod("writer.Write")
-
var sent int
for len(p) > 0 {
@@ -155,28 +96,10 @@ func (sw *sendWriter) Write(p []byte) (int, error) {
return sent, nil
}
-// ReadFrom implements io.ReaderFrom.
-func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) {
- countMethod("writer.ReadFrom")
+// wrapWriter hides ReadFrom to prevent inifinite recursion
+type wrapWriter struct{ io.Writer }
- var nRead int64
- buf := make([]byte, WriteBufferSize)
+func (ww *wrapWriter) Write(p []byte) (int, error) { return ww.Writer.Write(p) }
- for {
- n, err := r.Read(buf)
- nRead += int64(n)
-
- if n > 0 {
- if err := sw.sender(buf[:n]); err != nil {
- return nRead, err
- }
- }
-
- if err != nil {
- if errors.Is(err, io.EOF) {
- return nRead, nil
- }
- return nRead, err
- }
- }
-}
+// Deprecated: will be removed in v14. Use io.Copy instead.
+func (sw *sendWriter) ReadFrom(r io.Reader) (int64, error) { return io.Copy(&wrapWriter{sw}, r) }