diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-07-12 15:51:51 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2019-07-12 15:51:51 +0300 |
commit | 87dec4f884edd1b20d3a4cc9e0a00fd860285db6 (patch) | |
tree | fde32fec7ba37df672b89e724939b2b1919785d2 | |
parent | 06571fb4ba78ba0f5928a99ee65d1ba9be4d6d4a (diff) |
Lease-based disk cache strategy
-rw-r--r-- | internal/cache/cachedb.go | 102 | ||||
-rw-r--r-- | internal/cache/cachedb_test.go | 119 | ||||
-rw-r--r-- | internal/cache/keyer.go | 303 | ||||
-rw-r--r-- | internal/cache/prometheus.go | 60 | ||||
-rw-r--r-- | internal/tempdir/tempdir.go | 4 |
5 files changed, 588 insertions, 0 deletions
diff --git a/internal/cache/cachedb.go b/internal/cache/cachedb.go new file mode 100644 index 000000000..9ba6deb25 --- /dev/null +++ b/internal/cache/cachedb.go @@ -0,0 +1,102 @@ +package cache + +import ( + "context" + "errors" + "io" + "os" + "path/filepath" + + "github.com/golang/protobuf/proto" + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/internal/safe" +) + +// StreamDB stores and retrieves byte streams for repository related RPCs +type StreamDB struct { + ck Keyer +} + +// NewStreamDB will open the stream database at the specified file path. +func NewStreamDB(ck Keyer) *StreamDB { + return &StreamDB{ + ck: ck, + } +} + +// ErrReqNotFound indicates the request does not exist within the repo digest +var ErrReqNotFound = errors.New("request digest not found within repo namespace") + +// GetStream will fetch the cached stream for a request. It is the +// responsibility of the caller to close the stream when done. +func (sdb *StreamDB) GetStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message) (_ io.ReadCloser, err error) { + defer func() { + if err != nil { + countMiss() + } + }() + + countRequest() + + respPath, err := sdb.ck.KeyPath(ctx, repo, req) + switch { + case os.IsNotExist(err): + return nil, ErrReqNotFound + case err == nil: + break + default: + return nil, err + } + + respF, err := os.Open(respPath) + switch { + case os.IsNotExist(err): + return nil, ErrReqNotFound + case err == nil: + break + default: + return nil, err + } + + return instrumentedReadCloser{respF}, nil +} + +type instrumentedReadCloser struct { + io.ReadCloser +} + +func (irc instrumentedReadCloser) Read(p []byte) (n int, err error) { + n, err = irc.ReadCloser.Read(p) + countReadBytes(float64(n)) + return +} + +// PutStream will store a stream in a repo-namespace keyed by the digest of the +// request protobuf message. +func (sdb *StreamDB) PutStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message, src io.Reader) error { + reqPath, err := sdb.ck.KeyPath(ctx, repo, req) + if err != nil { + return err + } + + if err := os.MkdirAll(filepath.Dir(reqPath), 0755); err != nil { + return err + } + + sf, err := safe.CreateFileWriter(ctx, reqPath) + if err != nil { + return err + } + + n, err := io.Copy(sf, src) + if err != nil { + return err + } + countWriteBytes(float64(n)) + + if err := sf.Commit(); err != nil { + return err + } + + return nil +} diff --git a/internal/cache/cachedb_test.go b/internal/cache/cachedb_test.go new file mode 100644 index 000000000..fad20f663 --- /dev/null +++ b/internal/cache/cachedb_test.go @@ -0,0 +1,119 @@ +package cache_test + +import ( + "context" + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/internal/cache" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +func TestStreamDBNaiveKeyer(t *testing.T) { + keyer := cache.LeaseKeyer{} + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + ctx = setMockMethodCtx(ctx, "InfoRefsUploadPack") + + testRepo1, _, cleanup1 := testhelper.NewTestRepo(t) + defer cleanup1() + + testRepo2, _, cleanup2 := testhelper.NewTestRepo(t) + defer cleanup2() + + db := cache.NewStreamDB(cache.LeaseKeyer{}) + + req1 := &gitalypb.InfoRefsRequest{ + Repository: testRepo1, + } + req2 := &gitalypb.InfoRefsRequest{ + Repository: testRepo2, + } + + expectGetMiss := func(req *gitalypb.InfoRefsRequest) { + _, err := db.GetStream(ctx, req.Repository, req) + require.Equal(t, cache.ErrReqNotFound, err) + } + + expectGetHit := func(expectStr string, req *gitalypb.InfoRefsRequest) { + actualStream, err := db.GetStream(ctx, req.Repository, req) + require.NoError(t, err) + actualBytes, err := ioutil.ReadAll(actualStream) + require.NoError(t, err) + require.Equal(t, expectStr, string(actualBytes)) + } + + invalidationEvent := func(repo *gitalypb.Repository) { + lease, err := keyer.StartLease(repo) + require.NoError(t, err) + // imagine repo being modified here + require.NoError(t, lease.EndLease(ctx)) + } + + storeAndRetrieve := func(expectStr string, req *gitalypb.InfoRefsRequest) { + require.NoError(t, db.PutStream(ctx, req.Repository, req, strings.NewReader(expectStr))) + expectGetHit(expectStr, req) + } + + // cache is initially empty + expectGetMiss(req1) + expectGetMiss(req2) + + // populate cache + repo1contents := "store and retrieve value in repo 1" + storeAndRetrieve(repo1contents, req1) + repo2contents := "store and retrieve value in repo 2" + storeAndRetrieve(repo2contents, req2) + + // invalidation makes previous value stale and unreachable + invalidationEvent(req1.Repository) + expectGetMiss(req1) + expectGetHit(repo2contents, req2) // repo1 invalidation doesn't affect repo2 + + // store new value for same cache value but at new generation + expectStream2 := "not what you were looking for" + require.NoError(t, db.PutStream(ctx, req1.Repository, req1, strings.NewReader(expectStream2))) + expectGetHit(expectStream2, req1) + + // start critical section without closing + repo1Lease, err := keyer.StartLease(req1.Repository) + require.NoError(t, err) + + // accessing repo cache with open critical section should fail + _, err = db.GetStream(ctx, req1.Repository, req1) + require.Equal(t, err, cache.ErrPendingExists) + err = db.PutStream(ctx, req1.Repository, req1, strings.NewReader(repo1contents)) + require.Equal(t, err, cache.ErrPendingExists) + + expectGetHit(repo2contents, req2) // other repo caches should be unaffected + + // opening and closing a new critical zone doesn't resolve the issue + invalidationEvent(req1.Repository) + _, err = db.GetStream(ctx, req1.Repository, req1) + require.Equal(t, err, cache.ErrPendingExists) + + // only completing/removing the pending generation file will allow access + require.NoError(t, repo1Lease.EndLease(ctx)) + expectGetMiss(req1) +} + +func setMockMethodCtx(ctx context.Context, method string) context.Context { + return grpc.NewContextWithServerTransportStream(ctx, mockServerTransportStream{method}) +} + +type mockServerTransportStream struct { + method string +} + +func (msts mockServerTransportStream) Method() string { return msts.method } +func (mockServerTransportStream) SetHeader(md metadata.MD) error { return nil } +func (mockServerTransportStream) SendHeader(md metadata.MD) error { return nil } +func (mockServerTransportStream) SetTrailer(md metadata.MD) error { return nil } diff --git a/internal/cache/keyer.go b/internal/cache/keyer.go new file mode 100644 index 000000000..c32dac1fe --- /dev/null +++ b/internal/cache/keyer.go @@ -0,0 +1,303 @@ +package cache + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/golang/protobuf/proto" + "github.com/google/uuid" + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/safe" + "gitlab.com/gitlab-org/gitaly/internal/tempdir" + "gitlab.com/gitlab-org/gitaly/internal/version" + "google.golang.org/grpc" +) + +var ( + // ErrMissingLeaseFile indicates a lease file does not exist on the + // filesystem that the lease ender expected to be there + ErrMissingLeaseFile = errors.New("lease file unexpectedly missing") + // ErrInvalidUUID indicates an internal error with generating a UUID + ErrInvalidUUID = errors.New("unable to generate valid UUID") + // ErrCtxMethodMissing indicates the provided context does not contain the + // expected information about the current gRPC method + ErrCtxMethodMissing = errors.New("context does not contain gRPC method name") + // ErrPendingExists indicates that there is a critical zone for the current + // repository in the pending transition + ErrPendingExists = errors.New("one or more cache generations are pending transition for the current repository") +) + +// Keyer abstracts how to obtain a unique file path key for a request at a +// specific generation of the cache. The key path will magically update as new +// critical sections are declared. An error will be returned if the repo's cache +// has any open critical sections. +type Keyer interface { + // KeyPath will return a key filepath for the provided request. If an error + // is returned, the cache should not be used. + KeyPath(context.Context, *gitalypb.Repository, proto.Message) (string, error) +} + +// LeaseKeyer will try to return a key path for the current generation of +// the repo's cache. It uses a strategy that avoids file locks in favor of +// atomically created/renamed files. Read more about LeaseKeyer's design: +// https://gitlab.com/gitlab-org/gitaly/issues/1745 +type LeaseKeyer struct{} + +type lease struct { + pendingPath string + repo *gitalypb.Repository +} + +// EndLease will end the lease by removing the pending lease file and updating +// the key file with the current lease ID. +func (l lease) EndLease(ctx context.Context) error { + _, err := updateLatest(ctx, l.repo) + if err != nil { + return err + } + + if err := os.Remove(l.pendingPath); err != nil { + if os.IsNotExist(err) { + return countErr(ErrMissingLeaseFile) + } + return err + } + + return nil +} + +func updateLatest(ctx context.Context, repo *gitalypb.Repository) (string, error) { + repoPath, err := helper.GetRepoPath(repo) + if err != nil { + return "", err + } + + lPath := latestPath(repoPath) + if err := os.MkdirAll(filepath.Dir(lPath), 0755); err != nil { + return "", err + } + + latest, err := safe.CreateFileWriter(ctx, lPath) + if err != nil { + return "", err + } + + nextGenID := uuid.New().String() + if nextGenID == "" { + return "", ErrInvalidUUID + } + + if _, err = latest.Write([]byte(nextGenID)); err != nil { + return "", err + } + + if err := latest.Commit(); err != nil { + return "", err + } + + return nextGenID, nil +} + +// LeaseEnder allows the caller to indicate when a lease is no longer needed +type LeaseEnder interface { + EndLease(context.Context) error +} + +// StartLease will mark the repository as being in an indeterministic state. +// This is typically used when modifying the repo, since the cache is not +// stable until after the modification is complete. A lease object will be +// returned that allows the caller to signal the end of the lease. +func (LeaseKeyer) StartLease(repo *gitalypb.Repository) (LeaseEnder, error) { + pendingPath, err := newPendingLease(repo) + if err != nil { + return lease{}, err + } + + return lease{ + pendingPath: pendingPath, + repo: repo, + }, nil +} + +// staleAge is how old we consider a pending file to be stale before removal +const staleAge = time.Hour + +// KeyPath will attempt to return the unique keypath for a request in the +// specified repo for the current generation. The context must contain the gRPC +// method in its values. +func (LeaseKeyer) KeyPath(ctx context.Context, repo *gitalypb.Repository, req proto.Message) (string, error) { + pending, err := currentLeases(repo) + if err != nil { + return "", err + } + + repoPath, err := helper.GetRepoPath(repo) + if err != nil { + return "", err + } + + pDir := pendingDir(repoPath) + + anyValidPending := false + for _, p := range pending { + if time.Since(p.ModTime()) > staleAge { + pPath := filepath.Join(pDir, p.Name()) + if err := os.Remove(pPath); err != nil && !os.IsNotExist(err) { + return "", err + } + continue + } + anyValidPending = true + } + + if anyValidPending { + return "", countErr(ErrPendingExists) + } + + genID, err := currentGenID(ctx, repo) + if err != nil { + return "", err + } + + key, err := compositeKeyHashHex(ctx, genID, req) + if err != nil { + return "", err + } + + cDir, err := cacheDir(repo) + if err != nil { + return "", err + } + + return radixPath(cDir, key) +} + +// radixPath is the same directory structure scheme used by git. This scheme +// allows for the objects to be randomly distributed across folders based on +// the first 2 hex chars of the key (i.e. 256 possible top level folders). +func radixPath(root, key string) (string, error) { + return filepath.Join(root, key[0:2], key[2:]), nil +} + +func newPendingLease(repo *gitalypb.Repository) (string, error) { + repoPath, err := helper.GetRepoPath(repo) + if err != nil { + return "", err + } + + pDir := pendingDir(repoPath) + if err := os.MkdirAll(pDir, 0755); err != nil { + return "", err + } + + f, err := ioutil.TempFile(pDir, "") + if err != nil { + return "", err + } + + if err := f.Close(); err != nil { + return "", err + } + + return f.Name(), nil +} + +// cacheDir is $STORAGE/+gitaly/cache +func cacheDir(repo *gitalypb.Repository) (string, error) { + storagePath, err := helper.GetStorageByName(repo.StorageName) + if err != nil { + return "", err + } + + absPath := filepath.Join(storagePath, tempdir.CachePrefix) + + return absPath, nil +} + +func currentLeases(repo *gitalypb.Repository) ([]os.FileInfo, error) { + repoPath, err := helper.GetRepoPath(repo) + if err != nil { + return nil, err + } + + pendings, err := ioutil.ReadDir(pendingDir(repoPath)) + switch { + case os.IsNotExist(err): + // pending files subdir don't exist yet, that's okay + break + case err == nil: + break + default: + return nil, err + } + + return pendings, nil +} + +func currentGenID(ctx context.Context, repo *gitalypb.Repository) (string, error) { + repoPath, err := helper.GetRepoPath(repo) + if err != nil { + return "", err + } + + latestBytes, err := ioutil.ReadFile(latestPath(repoPath)) + switch { + case os.IsNotExist(err): + // latest file doesn't exist, so create one + return updateLatest(ctx, repo) + case err == nil: + return string(latestBytes), nil + default: + return "", err + } +} + +func stateDir(repoDir string) string { return filepath.Join(repoDir, "state") } +func pendingDir(repoDir string) string { return filepath.Join(stateDir(repoDir), "pending") } +func latestPath(repoDir string) string { return filepath.Join(stateDir(repoDir), "latest") } + +// compositeKeyHashHex returns a hex encoded string that is a SHA256 hash sum of +// the composite key made up of the following properties: Gitaly version, gRPC +// method, repo cache current generation ID, protobuf request +func compositeKeyHashHex(ctx context.Context, genID string, req proto.Message) (string, error) { + method, ok := grpc.Method(ctx) + if !ok { + return "", ErrCtxMethodMissing + } + + reqSum, err := proto.Marshal(req) + if err != nil { + return "", err + } + + h := sha256.New() + + for _, i := range []string{ + version.GetVersion(), + method, + genID, + string(reqSum), + } { + _, err := h.Write(prefixLen(i)) + if err != nil { + return "", err + } + } + + return hex.EncodeToString(h.Sum(nil)), nil +} + +// prefixLen reduces the risk of collisions due to different combinations of +// concatenated strings producing the same content. +// e.g. f+oobar and foo+bar concatenate to the same thing: foobar +func prefixLen(s string) []byte { + return []byte(fmt.Sprintf("%08x%s", len(s), s)) +} diff --git a/internal/cache/prometheus.go b/internal/cache/prometheus.go new file mode 100644 index 000000000..45672f24b --- /dev/null +++ b/internal/cache/prometheus.go @@ -0,0 +1,60 @@ +package cache + +import "github.com/prometheus/client_golang/prometheus" + +var ( + requestTotals = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_diskcache_requests_total", + Help: "Total number of disk cache requests", + }, + ) + missTotals = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_diskcache_miss_total", + Help: "Total number of disk cache misses", + }, + ) + bytesStoredtotals = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_diskcache_bytes_stored_total", + Help: "Total number of disk cache bytes stored", + }, + ) + bytesFetchedtotals = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_diskcache_bytes_fetched_total", + Help: "Total number of disk cache bytes fetched", + }, + ) + errTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_diskcache_errors_total", + Help: "Total number of errors encountered by disk cache", + }, + []string{"error"}, + ) +) + +func init() { + prometheus.MustRegister(requestTotals) + prometheus.MustRegister(missTotals) + prometheus.MustRegister(bytesStoredtotals) + prometheus.MustRegister(bytesFetchedtotals) + prometheus.MustRegister(errTotal) +} + +func countErr(err error) error { + switch err { + case ErrMissingLeaseFile: + errTotal.WithLabelValues("ErrMissingLeaseFile").Inc() + case ErrPendingExists: + errTotal.WithLabelValues("ErrPendingExists").Inc() + } + return err +} + +func countRequest() { requestTotals.Inc() } +func countMiss() { missTotals.Inc() } +func countWriteBytes(n float64) { bytesStoredtotals.Add(n) } +func countReadBytes(n float64) { bytesFetchedtotals.Add(n) } diff --git a/internal/tempdir/tempdir.go b/internal/tempdir/tempdir.go index faa0bc0b0..cc0b558ca 100644 --- a/internal/tempdir/tempdir.go +++ b/internal/tempdir/tempdir.go @@ -27,6 +27,10 @@ const ( // directories. TmpRootPrefix = GitalyDataPrefix + "/tmp" + // CachePrefix is the directory where all cache data is stored on a + // storage location. + CachePrefix = GitalyDataPrefix + "/cache" + // MaxAge is used by ForDeleteAllRepositories. It is also a fallback // for the context-scoped temporary directories, to ensure they get // cleaned up if the cleanup at the end of the context failed to run. |