Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-07-12 15:51:51 +0300
committerJacob Vosmaer <jacob@gitlab.com>2019-07-12 15:51:51 +0300
commit87dec4f884edd1b20d3a4cc9e0a00fd860285db6 (patch)
treefde32fec7ba37df672b89e724939b2b1919785d2
parent06571fb4ba78ba0f5928a99ee65d1ba9be4d6d4a (diff)
Lease-based disk cache strategy
-rw-r--r--internal/cache/cachedb.go102
-rw-r--r--internal/cache/cachedb_test.go119
-rw-r--r--internal/cache/keyer.go303
-rw-r--r--internal/cache/prometheus.go60
-rw-r--r--internal/tempdir/tempdir.go4
5 files changed, 588 insertions, 0 deletions
diff --git a/internal/cache/cachedb.go b/internal/cache/cachedb.go
new file mode 100644
index 000000000..9ba6deb25
--- /dev/null
+++ b/internal/cache/cachedb.go
@@ -0,0 +1,102 @@
+package cache
+
+import (
+ "context"
+ "errors"
+ "io"
+ "os"
+ "path/filepath"
+
+ "github.com/golang/protobuf/proto"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/internal/safe"
+)
+
+// StreamDB stores and retrieves byte streams for repository related RPCs
+type StreamDB struct {
+ ck Keyer
+}
+
+// NewStreamDB will open the stream database at the specified file path.
+func NewStreamDB(ck Keyer) *StreamDB {
+ return &StreamDB{
+ ck: ck,
+ }
+}
+
+// ErrReqNotFound indicates the request does not exist within the repo digest
+var ErrReqNotFound = errors.New("request digest not found within repo namespace")
+
+// GetStream will fetch the cached stream for a request. It is the
+// responsibility of the caller to close the stream when done.
+func (sdb *StreamDB) GetStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message) (_ io.ReadCloser, err error) {
+ defer func() {
+ if err != nil {
+ countMiss()
+ }
+ }()
+
+ countRequest()
+
+ respPath, err := sdb.ck.KeyPath(ctx, repo, req)
+ switch {
+ case os.IsNotExist(err):
+ return nil, ErrReqNotFound
+ case err == nil:
+ break
+ default:
+ return nil, err
+ }
+
+ respF, err := os.Open(respPath)
+ switch {
+ case os.IsNotExist(err):
+ return nil, ErrReqNotFound
+ case err == nil:
+ break
+ default:
+ return nil, err
+ }
+
+ return instrumentedReadCloser{respF}, nil
+}
+
+type instrumentedReadCloser struct {
+ io.ReadCloser
+}
+
+func (irc instrumentedReadCloser) Read(p []byte) (n int, err error) {
+ n, err = irc.ReadCloser.Read(p)
+ countReadBytes(float64(n))
+ return
+}
+
+// PutStream will store a stream in a repo-namespace keyed by the digest of the
+// request protobuf message.
+func (sdb *StreamDB) PutStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message, src io.Reader) error {
+ reqPath, err := sdb.ck.KeyPath(ctx, repo, req)
+ if err != nil {
+ return err
+ }
+
+ if err := os.MkdirAll(filepath.Dir(reqPath), 0755); err != nil {
+ return err
+ }
+
+ sf, err := safe.CreateFileWriter(ctx, reqPath)
+ if err != nil {
+ return err
+ }
+
+ n, err := io.Copy(sf, src)
+ if err != nil {
+ return err
+ }
+ countWriteBytes(float64(n))
+
+ if err := sf.Commit(); err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/cache/cachedb_test.go b/internal/cache/cachedb_test.go
new file mode 100644
index 000000000..fad20f663
--- /dev/null
+++ b/internal/cache/cachedb_test.go
@@ -0,0 +1,119 @@
+package cache_test
+
+import (
+ "context"
+ "io/ioutil"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+func TestStreamDBNaiveKeyer(t *testing.T) {
+ keyer := cache.LeaseKeyer{}
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+
+ ctx = setMockMethodCtx(ctx, "InfoRefsUploadPack")
+
+ testRepo1, _, cleanup1 := testhelper.NewTestRepo(t)
+ defer cleanup1()
+
+ testRepo2, _, cleanup2 := testhelper.NewTestRepo(t)
+ defer cleanup2()
+
+ db := cache.NewStreamDB(cache.LeaseKeyer{})
+
+ req1 := &gitalypb.InfoRefsRequest{
+ Repository: testRepo1,
+ }
+ req2 := &gitalypb.InfoRefsRequest{
+ Repository: testRepo2,
+ }
+
+ expectGetMiss := func(req *gitalypb.InfoRefsRequest) {
+ _, err := db.GetStream(ctx, req.Repository, req)
+ require.Equal(t, cache.ErrReqNotFound, err)
+ }
+
+ expectGetHit := func(expectStr string, req *gitalypb.InfoRefsRequest) {
+ actualStream, err := db.GetStream(ctx, req.Repository, req)
+ require.NoError(t, err)
+ actualBytes, err := ioutil.ReadAll(actualStream)
+ require.NoError(t, err)
+ require.Equal(t, expectStr, string(actualBytes))
+ }
+
+ invalidationEvent := func(repo *gitalypb.Repository) {
+ lease, err := keyer.StartLease(repo)
+ require.NoError(t, err)
+ // imagine repo being modified here
+ require.NoError(t, lease.EndLease(ctx))
+ }
+
+ storeAndRetrieve := func(expectStr string, req *gitalypb.InfoRefsRequest) {
+ require.NoError(t, db.PutStream(ctx, req.Repository, req, strings.NewReader(expectStr)))
+ expectGetHit(expectStr, req)
+ }
+
+ // cache is initially empty
+ expectGetMiss(req1)
+ expectGetMiss(req2)
+
+ // populate cache
+ repo1contents := "store and retrieve value in repo 1"
+ storeAndRetrieve(repo1contents, req1)
+ repo2contents := "store and retrieve value in repo 2"
+ storeAndRetrieve(repo2contents, req2)
+
+ // invalidation makes previous value stale and unreachable
+ invalidationEvent(req1.Repository)
+ expectGetMiss(req1)
+ expectGetHit(repo2contents, req2) // repo1 invalidation doesn't affect repo2
+
+ // store new value for same cache value but at new generation
+ expectStream2 := "not what you were looking for"
+ require.NoError(t, db.PutStream(ctx, req1.Repository, req1, strings.NewReader(expectStream2)))
+ expectGetHit(expectStream2, req1)
+
+ // start critical section without closing
+ repo1Lease, err := keyer.StartLease(req1.Repository)
+ require.NoError(t, err)
+
+ // accessing repo cache with open critical section should fail
+ _, err = db.GetStream(ctx, req1.Repository, req1)
+ require.Equal(t, err, cache.ErrPendingExists)
+ err = db.PutStream(ctx, req1.Repository, req1, strings.NewReader(repo1contents))
+ require.Equal(t, err, cache.ErrPendingExists)
+
+ expectGetHit(repo2contents, req2) // other repo caches should be unaffected
+
+ // opening and closing a new critical zone doesn't resolve the issue
+ invalidationEvent(req1.Repository)
+ _, err = db.GetStream(ctx, req1.Repository, req1)
+ require.Equal(t, err, cache.ErrPendingExists)
+
+ // only completing/removing the pending generation file will allow access
+ require.NoError(t, repo1Lease.EndLease(ctx))
+ expectGetMiss(req1)
+}
+
+func setMockMethodCtx(ctx context.Context, method string) context.Context {
+ return grpc.NewContextWithServerTransportStream(ctx, mockServerTransportStream{method})
+}
+
+type mockServerTransportStream struct {
+ method string
+}
+
+func (msts mockServerTransportStream) Method() string { return msts.method }
+func (mockServerTransportStream) SetHeader(md metadata.MD) error { return nil }
+func (mockServerTransportStream) SendHeader(md metadata.MD) error { return nil }
+func (mockServerTransportStream) SetTrailer(md metadata.MD) error { return nil }
diff --git a/internal/cache/keyer.go b/internal/cache/keyer.go
new file mode 100644
index 000000000..c32dac1fe
--- /dev/null
+++ b/internal/cache/keyer.go
@@ -0,0 +1,303 @@
+package cache
+
+import (
+ "context"
+ "crypto/sha256"
+ "encoding/hex"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/safe"
+ "gitlab.com/gitlab-org/gitaly/internal/tempdir"
+ "gitlab.com/gitlab-org/gitaly/internal/version"
+ "google.golang.org/grpc"
+)
+
+var (
+ // ErrMissingLeaseFile indicates a lease file does not exist on the
+ // filesystem that the lease ender expected to be there
+ ErrMissingLeaseFile = errors.New("lease file unexpectedly missing")
+ // ErrInvalidUUID indicates an internal error with generating a UUID
+ ErrInvalidUUID = errors.New("unable to generate valid UUID")
+ // ErrCtxMethodMissing indicates the provided context does not contain the
+ // expected information about the current gRPC method
+ ErrCtxMethodMissing = errors.New("context does not contain gRPC method name")
+ // ErrPendingExists indicates that there is a critical zone for the current
+ // repository in the pending transition
+ ErrPendingExists = errors.New("one or more cache generations are pending transition for the current repository")
+)
+
+// Keyer abstracts how to obtain a unique file path key for a request at a
+// specific generation of the cache. The key path will magically update as new
+// critical sections are declared. An error will be returned if the repo's cache
+// has any open critical sections.
+type Keyer interface {
+ // KeyPath will return a key filepath for the provided request. If an error
+ // is returned, the cache should not be used.
+ KeyPath(context.Context, *gitalypb.Repository, proto.Message) (string, error)
+}
+
+// LeaseKeyer will try to return a key path for the current generation of
+// the repo's cache. It uses a strategy that avoids file locks in favor of
+// atomically created/renamed files. Read more about LeaseKeyer's design:
+// https://gitlab.com/gitlab-org/gitaly/issues/1745
+type LeaseKeyer struct{}
+
+type lease struct {
+ pendingPath string
+ repo *gitalypb.Repository
+}
+
+// EndLease will end the lease by removing the pending lease file and updating
+// the key file with the current lease ID.
+func (l lease) EndLease(ctx context.Context) error {
+ _, err := updateLatest(ctx, l.repo)
+ if err != nil {
+ return err
+ }
+
+ if err := os.Remove(l.pendingPath); err != nil {
+ if os.IsNotExist(err) {
+ return countErr(ErrMissingLeaseFile)
+ }
+ return err
+ }
+
+ return nil
+}
+
+func updateLatest(ctx context.Context, repo *gitalypb.Repository) (string, error) {
+ repoPath, err := helper.GetRepoPath(repo)
+ if err != nil {
+ return "", err
+ }
+
+ lPath := latestPath(repoPath)
+ if err := os.MkdirAll(filepath.Dir(lPath), 0755); err != nil {
+ return "", err
+ }
+
+ latest, err := safe.CreateFileWriter(ctx, lPath)
+ if err != nil {
+ return "", err
+ }
+
+ nextGenID := uuid.New().String()
+ if nextGenID == "" {
+ return "", ErrInvalidUUID
+ }
+
+ if _, err = latest.Write([]byte(nextGenID)); err != nil {
+ return "", err
+ }
+
+ if err := latest.Commit(); err != nil {
+ return "", err
+ }
+
+ return nextGenID, nil
+}
+
+// LeaseEnder allows the caller to indicate when a lease is no longer needed
+type LeaseEnder interface {
+ EndLease(context.Context) error
+}
+
+// StartLease will mark the repository as being in an indeterministic state.
+// This is typically used when modifying the repo, since the cache is not
+// stable until after the modification is complete. A lease object will be
+// returned that allows the caller to signal the end of the lease.
+func (LeaseKeyer) StartLease(repo *gitalypb.Repository) (LeaseEnder, error) {
+ pendingPath, err := newPendingLease(repo)
+ if err != nil {
+ return lease{}, err
+ }
+
+ return lease{
+ pendingPath: pendingPath,
+ repo: repo,
+ }, nil
+}
+
+// staleAge is how old we consider a pending file to be stale before removal
+const staleAge = time.Hour
+
+// KeyPath will attempt to return the unique keypath for a request in the
+// specified repo for the current generation. The context must contain the gRPC
+// method in its values.
+func (LeaseKeyer) KeyPath(ctx context.Context, repo *gitalypb.Repository, req proto.Message) (string, error) {
+ pending, err := currentLeases(repo)
+ if err != nil {
+ return "", err
+ }
+
+ repoPath, err := helper.GetRepoPath(repo)
+ if err != nil {
+ return "", err
+ }
+
+ pDir := pendingDir(repoPath)
+
+ anyValidPending := false
+ for _, p := range pending {
+ if time.Since(p.ModTime()) > staleAge {
+ pPath := filepath.Join(pDir, p.Name())
+ if err := os.Remove(pPath); err != nil && !os.IsNotExist(err) {
+ return "", err
+ }
+ continue
+ }
+ anyValidPending = true
+ }
+
+ if anyValidPending {
+ return "", countErr(ErrPendingExists)
+ }
+
+ genID, err := currentGenID(ctx, repo)
+ if err != nil {
+ return "", err
+ }
+
+ key, err := compositeKeyHashHex(ctx, genID, req)
+ if err != nil {
+ return "", err
+ }
+
+ cDir, err := cacheDir(repo)
+ if err != nil {
+ return "", err
+ }
+
+ return radixPath(cDir, key)
+}
+
+// radixPath is the same directory structure scheme used by git. This scheme
+// allows for the objects to be randomly distributed across folders based on
+// the first 2 hex chars of the key (i.e. 256 possible top level folders).
+func radixPath(root, key string) (string, error) {
+ return filepath.Join(root, key[0:2], key[2:]), nil
+}
+
+func newPendingLease(repo *gitalypb.Repository) (string, error) {
+ repoPath, err := helper.GetRepoPath(repo)
+ if err != nil {
+ return "", err
+ }
+
+ pDir := pendingDir(repoPath)
+ if err := os.MkdirAll(pDir, 0755); err != nil {
+ return "", err
+ }
+
+ f, err := ioutil.TempFile(pDir, "")
+ if err != nil {
+ return "", err
+ }
+
+ if err := f.Close(); err != nil {
+ return "", err
+ }
+
+ return f.Name(), nil
+}
+
+// cacheDir is $STORAGE/+gitaly/cache
+func cacheDir(repo *gitalypb.Repository) (string, error) {
+ storagePath, err := helper.GetStorageByName(repo.StorageName)
+ if err != nil {
+ return "", err
+ }
+
+ absPath := filepath.Join(storagePath, tempdir.CachePrefix)
+
+ return absPath, nil
+}
+
+func currentLeases(repo *gitalypb.Repository) ([]os.FileInfo, error) {
+ repoPath, err := helper.GetRepoPath(repo)
+ if err != nil {
+ return nil, err
+ }
+
+ pendings, err := ioutil.ReadDir(pendingDir(repoPath))
+ switch {
+ case os.IsNotExist(err):
+ // pending files subdir don't exist yet, that's okay
+ break
+ case err == nil:
+ break
+ default:
+ return nil, err
+ }
+
+ return pendings, nil
+}
+
+func currentGenID(ctx context.Context, repo *gitalypb.Repository) (string, error) {
+ repoPath, err := helper.GetRepoPath(repo)
+ if err != nil {
+ return "", err
+ }
+
+ latestBytes, err := ioutil.ReadFile(latestPath(repoPath))
+ switch {
+ case os.IsNotExist(err):
+ // latest file doesn't exist, so create one
+ return updateLatest(ctx, repo)
+ case err == nil:
+ return string(latestBytes), nil
+ default:
+ return "", err
+ }
+}
+
+func stateDir(repoDir string) string { return filepath.Join(repoDir, "state") }
+func pendingDir(repoDir string) string { return filepath.Join(stateDir(repoDir), "pending") }
+func latestPath(repoDir string) string { return filepath.Join(stateDir(repoDir), "latest") }
+
+// compositeKeyHashHex returns a hex encoded string that is a SHA256 hash sum of
+// the composite key made up of the following properties: Gitaly version, gRPC
+// method, repo cache current generation ID, protobuf request
+func compositeKeyHashHex(ctx context.Context, genID string, req proto.Message) (string, error) {
+ method, ok := grpc.Method(ctx)
+ if !ok {
+ return "", ErrCtxMethodMissing
+ }
+
+ reqSum, err := proto.Marshal(req)
+ if err != nil {
+ return "", err
+ }
+
+ h := sha256.New()
+
+ for _, i := range []string{
+ version.GetVersion(),
+ method,
+ genID,
+ string(reqSum),
+ } {
+ _, err := h.Write(prefixLen(i))
+ if err != nil {
+ return "", err
+ }
+ }
+
+ return hex.EncodeToString(h.Sum(nil)), nil
+}
+
+// prefixLen reduces the risk of collisions due to different combinations of
+// concatenated strings producing the same content.
+// e.g. f+oobar and foo+bar concatenate to the same thing: foobar
+func prefixLen(s string) []byte {
+ return []byte(fmt.Sprintf("%08x%s", len(s), s))
+}
diff --git a/internal/cache/prometheus.go b/internal/cache/prometheus.go
new file mode 100644
index 000000000..45672f24b
--- /dev/null
+++ b/internal/cache/prometheus.go
@@ -0,0 +1,60 @@
+package cache
+
+import "github.com/prometheus/client_golang/prometheus"
+
+var (
+ requestTotals = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitaly_diskcache_requests_total",
+ Help: "Total number of disk cache requests",
+ },
+ )
+ missTotals = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitaly_diskcache_miss_total",
+ Help: "Total number of disk cache misses",
+ },
+ )
+ bytesStoredtotals = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitaly_diskcache_bytes_stored_total",
+ Help: "Total number of disk cache bytes stored",
+ },
+ )
+ bytesFetchedtotals = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitaly_diskcache_bytes_fetched_total",
+ Help: "Total number of disk cache bytes fetched",
+ },
+ )
+ errTotal = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_diskcache_errors_total",
+ Help: "Total number of errors encountered by disk cache",
+ },
+ []string{"error"},
+ )
+)
+
+func init() {
+ prometheus.MustRegister(requestTotals)
+ prometheus.MustRegister(missTotals)
+ prometheus.MustRegister(bytesStoredtotals)
+ prometheus.MustRegister(bytesFetchedtotals)
+ prometheus.MustRegister(errTotal)
+}
+
+func countErr(err error) error {
+ switch err {
+ case ErrMissingLeaseFile:
+ errTotal.WithLabelValues("ErrMissingLeaseFile").Inc()
+ case ErrPendingExists:
+ errTotal.WithLabelValues("ErrPendingExists").Inc()
+ }
+ return err
+}
+
+func countRequest() { requestTotals.Inc() }
+func countMiss() { missTotals.Inc() }
+func countWriteBytes(n float64) { bytesStoredtotals.Add(n) }
+func countReadBytes(n float64) { bytesFetchedtotals.Add(n) }
diff --git a/internal/tempdir/tempdir.go b/internal/tempdir/tempdir.go
index faa0bc0b0..cc0b558ca 100644
--- a/internal/tempdir/tempdir.go
+++ b/internal/tempdir/tempdir.go
@@ -27,6 +27,10 @@ const (
// directories.
TmpRootPrefix = GitalyDataPrefix + "/tmp"
+ // CachePrefix is the directory where all cache data is stored on a
+ // storage location.
+ CachePrefix = GitalyDataPrefix + "/cache"
+
// MaxAge is used by ForDeleteAllRepositories. It is also a fallback
// for the context-scoped temporary directories, to ensure they get
// cleaned up if the cleanup at the end of the context failed to run.