Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-11-21 18:28:19 +0300
committerPaul Okstad <pokstad@gitlab.com>2019-11-21 18:28:19 +0300
commit27e6ab351740e99e396d3cc92839f78eb95e7176 (patch)
tree8095c59c254119edc1dea2baa78d8d5c64fce2c7
parent86e6225ecd1cade8965e727da192c210ea10e6b4 (diff)
Add new Prometheus metrics to investigate write pressure
-rw-r--r--internal/cache/cachedb.go47
-rw-r--r--internal/cache/cachedb_test.go82
-rw-r--r--internal/cache/export_test.go2
-rw-r--r--internal/cache/prometheus.go8
4 files changed, 138 insertions, 1 deletions
diff --git a/internal/cache/cachedb.go b/internal/cache/cachedb.go
index 82546f5ce..fc717134a 100644
--- a/internal/cache/cachedb.go
+++ b/internal/cache/cachedb.go
@@ -6,21 +6,57 @@ import (
"io"
"os"
"path/filepath"
+ "sync"
"github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/safe"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
+// maps a cache path to the number of active writers
+type activeFiles struct {
+ *sync.Mutex
+ m map[string]int
+}
+
+// trackFile returns a function that indicates if the current
+// writing of a file is the last known one, which
+// would indicate the current write is the "winner".
+func (af activeFiles) trackFile(path string) func() bool {
+ af.Lock()
+ defer af.Unlock()
+
+ af.m[path]++
+
+ return func() bool {
+ af.Lock()
+ defer af.Unlock()
+
+ af.m[path]--
+
+ winner := af.m[path] == 0
+ if winner {
+ delete(af.m, path) // reclaim memory
+ }
+
+ return winner
+ }
+}
+
// StreamDB stores and retrieves byte streams for repository related RPCs
type StreamDB struct {
ck Keyer
+ af activeFiles
}
// NewStreamDB will open the stream database at the specified file path.
func NewStreamDB(ck Keyer) *StreamDB {
return &StreamDB{
ck: ck,
+ af: activeFiles{
+ Mutex: &sync.Mutex{},
+ m: map[string]int{},
+ },
}
}
@@ -79,6 +115,14 @@ func (sdb *StreamDB) PutStream(ctx context.Context, repo *gitalypb.Repository, r
return err
}
+ var n int64
+ isWinner := sdb.af.trackFile(reqPath)
+ defer func() {
+ if !isWinner() {
+ countLoserBytes(float64(n))
+ }
+ }()
+
if err := os.MkdirAll(filepath.Dir(reqPath), 0755); err != nil {
return err
}
@@ -89,13 +133,14 @@ func (sdb *StreamDB) PutStream(ctx context.Context, repo *gitalypb.Repository, r
}
defer sf.Close()
- n, err := io.Copy(sf, src)
+ n, err = io.Copy(sf, src)
if err != nil {
return err
}
countWriteBytes(float64(n))
if err := sf.Commit(); err != nil {
+ errTotal.WithLabelValues("ErrSafefileCommit").Inc()
return err
}
diff --git a/internal/cache/cachedb_test.go b/internal/cache/cachedb_test.go
index 1fe267fa8..f556051cf 100644
--- a/internal/cache/cachedb_test.go
+++ b/internal/cache/cachedb_test.go
@@ -2,13 +2,18 @@ package cache_test
import (
"context"
+ "io"
"io/ioutil"
+ "os"
+ "path/filepath"
"strings"
+ "sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
@@ -107,3 +112,80 @@ func TestStreamDBNaiveKeyer(t *testing.T) {
_, err = keyer.StartLease(req1.Repository)
require.NoError(t, err)
}
+
+func injectTempStorage(t testing.TB) (string, testhelper.Cleanup) {
+ oldStorages := config.Config.Storages
+ tmpDir, err := ioutil.TempDir("", t.Name())
+ require.NoError(t, err)
+
+ name := filepath.Base(tmpDir)
+ config.Config.Storages = append(config.Config.Storages, config.Storage{
+ Name: name,
+ Path: tmpDir,
+ })
+
+ cleanup := func() {
+ config.Config.Storages = oldStorages
+ require.NoError(t, os.RemoveAll(tmpDir))
+ }
+
+ return name, cleanup
+}
+
+func TestLoserCount(t *testing.T) {
+ db := cache.NewStreamDB(cache.LeaseKeyer{})
+
+ // the test can be contaminate by other tests using the cache, so a
+ // dedicated storage location should be used
+ storageName, storageCleanup := injectTempStorage(t)
+ defer storageCleanup()
+
+ req := &gitalypb.InfoRefsRequest{
+ Repository: &gitalypb.Repository{
+ RelativePath: "test",
+ StorageName: storageName,
+ },
+ }
+ ctx := testhelper.SetCtxGrpcMethod(context.Background(), "InfoRefsUploadPack")
+
+ leashes := []chan struct{}{make(chan struct{}), make(chan struct{}), make(chan struct{})}
+ errQ := make(chan error)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(len(leashes))
+
+ // Run streams concurrently for the same repo and request
+ for _, l := range leashes {
+ go func(l chan struct{}) { errQ <- db.PutStream(ctx, req.Repository, req, leashedReader{l, wg}) }(l)
+ l <- struct{}{}
+ }
+
+ wg.Wait()
+
+ start := cache.ExportMockLoserBytes.Count()
+
+ for _, l := range leashes {
+ close(l)
+ require.NoError(t, <-errQ)
+ }
+
+ require.Equal(t, start+len(leashes)-1, cache.ExportMockLoserBytes.Count())
+}
+
+type leashedReader struct {
+ q <-chan struct{}
+ wg *sync.WaitGroup
+}
+
+func (lr leashedReader) Read(p []byte) (n int, err error) {
+ _, ok := <-lr.q
+
+ if !ok {
+ return 0, io.EOF // on channel close
+ }
+
+ lr.wg.Done()
+ lr.wg.Wait() // wait for all other readers to sync
+
+ return 1, nil // on receive, return 1 byte read
+}
diff --git a/internal/cache/export_test.go b/internal/cache/export_test.go
index 287072118..2f1538e45 100644
--- a/internal/cache/export_test.go
+++ b/internal/cache/export_test.go
@@ -5,6 +5,7 @@ import "sync"
var (
ExportMockRemovalCounter = new(mockCounter)
ExportMockCheckCounter = new(mockCounter)
+ ExportMockLoserBytes = new(mockCounter)
ExportDisableMoveAndClear = &disableMoveAndClear
ExportDisableWalker = &disableWalker
@@ -31,4 +32,5 @@ func init() {
// override counter functions with our mocked version
countWalkRemoval = func() { ExportMockRemovalCounter.Add(1) }
countWalkCheck = func() { ExportMockCheckCounter.Add(1) }
+ countLoserBytes = func(n float64) { ExportMockLoserBytes.Add(int(n)) }
}
diff --git a/internal/cache/prometheus.go b/internal/cache/prometheus.go
index f4b7c9a11..16ce368a9 100644
--- a/internal/cache/prometheus.go
+++ b/internal/cache/prometheus.go
@@ -27,6 +27,12 @@ var (
Help: "Total number of disk cache bytes fetched",
},
)
+ bytesLoserTotals = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitaly_diskcache_bytes_loser_total",
+ Help: "Total number of disk cache bytes from losing writes",
+ },
+ )
errTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitaly_diskcache_errors_total",
@@ -53,6 +59,7 @@ func init() {
prometheus.MustRegister(missTotals)
prometheus.MustRegister(bytesStoredtotals)
prometheus.MustRegister(bytesFetchedtotals)
+ prometheus.MustRegister(bytesLoserTotals)
prometheus.MustRegister(errTotal)
prometheus.MustRegister(walkerCheckTotal)
prometheus.MustRegister(walkerRemovalTotal)
@@ -73,6 +80,7 @@ var (
countMiss = func() { missTotals.Inc() }
countWriteBytes = func(n float64) { bytesStoredtotals.Add(n) }
countReadBytes = func(n float64) { bytesFetchedtotals.Add(n) }
+ countLoserBytes = func(n float64) { bytesLoserTotals.Add(n) }
countWalkRemoval = func() { walkerRemovalTotal.Inc() }
countWalkCheck = func() { walkerCheckTotal.Inc() }
)