diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-11-21 18:28:19 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-11-21 18:28:19 +0300 |
commit | 27e6ab351740e99e396d3cc92839f78eb95e7176 (patch) | |
tree | 8095c59c254119edc1dea2baa78d8d5c64fce2c7 | |
parent | 86e6225ecd1cade8965e727da192c210ea10e6b4 (diff) |
Add new Prometheus metrics to investigate write pressure
-rw-r--r-- | internal/cache/cachedb.go | 47 | ||||
-rw-r--r-- | internal/cache/cachedb_test.go | 82 | ||||
-rw-r--r-- | internal/cache/export_test.go | 2 | ||||
-rw-r--r-- | internal/cache/prometheus.go | 8 |
4 files changed, 138 insertions, 1 deletions
diff --git a/internal/cache/cachedb.go b/internal/cache/cachedb.go index 82546f5ce..fc717134a 100644 --- a/internal/cache/cachedb.go +++ b/internal/cache/cachedb.go @@ -6,21 +6,57 @@ import ( "io" "os" "path/filepath" + "sync" "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/safe" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) +// maps a cache path to the number of active writers +type activeFiles struct { + *sync.Mutex + m map[string]int +} + +// trackFile returns a function that indicates if the current +// writing of a file is the last known one, which +// would indicate the current write is the "winner". +func (af activeFiles) trackFile(path string) func() bool { + af.Lock() + defer af.Unlock() + + af.m[path]++ + + return func() bool { + af.Lock() + defer af.Unlock() + + af.m[path]-- + + winner := af.m[path] == 0 + if winner { + delete(af.m, path) // reclaim memory + } + + return winner + } +} + // StreamDB stores and retrieves byte streams for repository related RPCs type StreamDB struct { ck Keyer + af activeFiles } // NewStreamDB will open the stream database at the specified file path. func NewStreamDB(ck Keyer) *StreamDB { return &StreamDB{ ck: ck, + af: activeFiles{ + Mutex: &sync.Mutex{}, + m: map[string]int{}, + }, } } @@ -79,6 +115,14 @@ func (sdb *StreamDB) PutStream(ctx context.Context, repo *gitalypb.Repository, r return err } + var n int64 + isWinner := sdb.af.trackFile(reqPath) + defer func() { + if !isWinner() { + countLoserBytes(float64(n)) + } + }() + if err := os.MkdirAll(filepath.Dir(reqPath), 0755); err != nil { return err } @@ -89,13 +133,14 @@ func (sdb *StreamDB) PutStream(ctx context.Context, repo *gitalypb.Repository, r } defer sf.Close() - n, err := io.Copy(sf, src) + n, err = io.Copy(sf, src) if err != nil { return err } countWriteBytes(float64(n)) if err := sf.Commit(); err != nil { + errTotal.WithLabelValues("ErrSafefileCommit").Inc() return err } diff --git a/internal/cache/cachedb_test.go b/internal/cache/cachedb_test.go index 1fe267fa8..f556051cf 100644 --- a/internal/cache/cachedb_test.go +++ b/internal/cache/cachedb_test.go @@ -2,13 +2,18 @@ package cache_test import ( "context" + "io" "io/ioutil" + "os" + "path/filepath" "strings" + "sync" "testing" "time" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/cache" + "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) @@ -107,3 +112,80 @@ func TestStreamDBNaiveKeyer(t *testing.T) { _, err = keyer.StartLease(req1.Repository) require.NoError(t, err) } + +func injectTempStorage(t testing.TB) (string, testhelper.Cleanup) { + oldStorages := config.Config.Storages + tmpDir, err := ioutil.TempDir("", t.Name()) + require.NoError(t, err) + + name := filepath.Base(tmpDir) + config.Config.Storages = append(config.Config.Storages, config.Storage{ + Name: name, + Path: tmpDir, + }) + + cleanup := func() { + config.Config.Storages = oldStorages + require.NoError(t, os.RemoveAll(tmpDir)) + } + + return name, cleanup +} + +func TestLoserCount(t *testing.T) { + db := cache.NewStreamDB(cache.LeaseKeyer{}) + + // the test can be contaminate by other tests using the cache, so a + // dedicated storage location should be used + storageName, storageCleanup := injectTempStorage(t) + defer storageCleanup() + + req := &gitalypb.InfoRefsRequest{ + Repository: &gitalypb.Repository{ + RelativePath: "test", + StorageName: storageName, + }, + } + ctx := testhelper.SetCtxGrpcMethod(context.Background(), "InfoRefsUploadPack") + + leashes := []chan struct{}{make(chan struct{}), make(chan struct{}), make(chan struct{})} + errQ := make(chan error) + + wg := &sync.WaitGroup{} + wg.Add(len(leashes)) + + // Run streams concurrently for the same repo and request + for _, l := range leashes { + go func(l chan struct{}) { errQ <- db.PutStream(ctx, req.Repository, req, leashedReader{l, wg}) }(l) + l <- struct{}{} + } + + wg.Wait() + + start := cache.ExportMockLoserBytes.Count() + + for _, l := range leashes { + close(l) + require.NoError(t, <-errQ) + } + + require.Equal(t, start+len(leashes)-1, cache.ExportMockLoserBytes.Count()) +} + +type leashedReader struct { + q <-chan struct{} + wg *sync.WaitGroup +} + +func (lr leashedReader) Read(p []byte) (n int, err error) { + _, ok := <-lr.q + + if !ok { + return 0, io.EOF // on channel close + } + + lr.wg.Done() + lr.wg.Wait() // wait for all other readers to sync + + return 1, nil // on receive, return 1 byte read +} diff --git a/internal/cache/export_test.go b/internal/cache/export_test.go index 287072118..2f1538e45 100644 --- a/internal/cache/export_test.go +++ b/internal/cache/export_test.go @@ -5,6 +5,7 @@ import "sync" var ( ExportMockRemovalCounter = new(mockCounter) ExportMockCheckCounter = new(mockCounter) + ExportMockLoserBytes = new(mockCounter) ExportDisableMoveAndClear = &disableMoveAndClear ExportDisableWalker = &disableWalker @@ -31,4 +32,5 @@ func init() { // override counter functions with our mocked version countWalkRemoval = func() { ExportMockRemovalCounter.Add(1) } countWalkCheck = func() { ExportMockCheckCounter.Add(1) } + countLoserBytes = func(n float64) { ExportMockLoserBytes.Add(int(n)) } } diff --git a/internal/cache/prometheus.go b/internal/cache/prometheus.go index f4b7c9a11..16ce368a9 100644 --- a/internal/cache/prometheus.go +++ b/internal/cache/prometheus.go @@ -27,6 +27,12 @@ var ( Help: "Total number of disk cache bytes fetched", }, ) + bytesLoserTotals = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_diskcache_bytes_loser_total", + Help: "Total number of disk cache bytes from losing writes", + }, + ) errTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitaly_diskcache_errors_total", @@ -53,6 +59,7 @@ func init() { prometheus.MustRegister(missTotals) prometheus.MustRegister(bytesStoredtotals) prometheus.MustRegister(bytesFetchedtotals) + prometheus.MustRegister(bytesLoserTotals) prometheus.MustRegister(errTotal) prometheus.MustRegister(walkerCheckTotal) prometheus.MustRegister(walkerRemovalTotal) @@ -73,6 +80,7 @@ var ( countMiss = func() { missTotals.Inc() } countWriteBytes = func(n float64) { bytesStoredtotals.Add(n) } countReadBytes = func(n float64) { bytesFetchedtotals.Add(n) } + countLoserBytes = func(n float64) { bytesLoserTotals.Add(n) } countWalkRemoval = func() { walkerRemovalTotal.Inc() } countWalkCheck = func() { walkerCheckTotal.Inc() } ) |