1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
package smarthttp
import (
"context"
"gitlab.com/gitlab-org/gitaly/v15/structerr"
"io"
"sync"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/proto/v15/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v15/internal/cache"
)
type infoRefCache struct {
streamer cache.Streamer
}
func newInfoRefCache(streamer cache.Streamer) infoRefCache {
return infoRefCache{
streamer: streamer,
}
}
var (
// prometheus counters
cacheAttemptTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "gitaly_inforef_cache_attempt_total",
Help: "Total number of smarthttp info-ref RPCs accessing the cache",
},
)
hitMissTotals = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "gitaly_inforef_cache_hit_miss_total",
Help: "Total number of smarthttp info-ref RPC hit/miss/err cache accesses",
},
[]string{"type"},
)
// counter functions are package vars to enable easy overriding for tests
countAttempt = func() { cacheAttemptTotal.Inc() }
countHit = func() { hitMissTotals.WithLabelValues("hit").Inc() }
countMiss = func() { hitMissTotals.WithLabelValues("miss").Inc() }
countErr = func() { hitMissTotals.WithLabelValues("err").Inc() }
)
func (c infoRefCache) tryCache(ctx context.Context, in *gitalypb.InfoRefsRequest, w io.Writer, missFn func(io.Writer) error) error {
if len(in.GetGitConfigOptions()) > 0 ||
len(in.GetGitProtocol()) > 0 {
return missFn(w)
}
logger := ctxlogrus.Extract(ctx).WithFields(log.Fields{"service": uploadPackSvc})
logger.Debug("Attempting to fetch cached response")
countAttempt()
stream, err := c.streamer.GetStream(ctx, in.GetRepository(), in)
switch err {
case nil:
defer stream.Close()
countHit()
logger.Info("cache hit for UploadPack response")
if _, err := io.Copy(w, stream); err != nil {
return structerr.NewInternal("cache copy: %w", err)
}
return nil
case cache.ErrReqNotFound:
countMiss()
logger.Info("cache miss for InfoRefsUploadPack response")
var wg sync.WaitGroup
defer wg.Wait()
pr, pw := io.Pipe()
wg.Add(1)
go func() {
defer wg.Done()
tr := io.TeeReader(pr, w)
if err := c.streamer.PutStream(ctx, in.Repository, in, tr); err != nil {
logger.Errorf("unable to store InfoRefsUploadPack response in cache: %q", err)
// discard remaining bytes if caching stream
// failed so that tee reader is not blocked
_, err = io.Copy(io.Discard, tr)
if err != nil {
logger.WithError(err).
Error("unable to discard remaining InfoRefsUploadPack cache stream")
}
}
}()
err = missFn(pw)
_ = pw.CloseWithError(err) // always returns nil
return err
default:
countErr()
logger.Infof("unable to fetch cached response: %q", err)
return missFn(w)
}
}
|