diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-05-30 22:35:56 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-06-12 10:04:05 +0300 |
commit | 122f867b04b24cb3fdfd2e5c2eb7d2662433919c (patch) | |
tree | f6e34929025316a9b9782c3d705db6f5033c90e7 | |
parent | 5ae2d919b4132890011890679c0005be99ea3387 (diff) |
Disk based cache for info-refspo-disk-cache
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 4 | ||||
-rw-r--r-- | internal/cache/cachedb.go | 100 | ||||
-rw-r--r-- | internal/cache/cachedb_test.go | 66 | ||||
-rw-r--r-- | internal/cache/helpers_test.go | 194 | ||||
-rw-r--r-- | internal/cache/keyer.go | 176 | ||||
-rw-r--r-- | internal/tempdir/tempdir.go | 4 |
7 files changed, 546 insertions, 0 deletions
@@ -3,7 +3,9 @@ module gitlab.com/gitlab-org/gitaly require ( github.com/BurntSushi/toml v0.3.1 github.com/cloudflare/tableflip v0.0.0-20190329062924-8392f1641731 + github.com/dchest/safefile v0.0.0-20151022103144-855e8d98f185 github.com/getsentry/raven-go v0.1.2 + github.com/gofrs/flock v0.7.1 github.com/golang/protobuf v1.3.1 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 @@ -20,6 +20,8 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/dchest/safefile v0.0.0-20151022103144-855e8d98f185 h1:3T8ZyTDp5QxTx3NU48JVb2u+75xc040fofcBaN+6jPA= +github.com/dchest/safefile v0.0.0-20151022103144-855e8d98f185/go.mod h1:cFRxtTwTOJkz2x3rQUNCYKWC93yP1VKjR8NUhqFxZNU= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/getsentry/raven-go v0.1.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= @@ -29,6 +31,8 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc= +github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= diff --git a/internal/cache/cachedb.go b/internal/cache/cachedb.go new file mode 100644 index 000000000..2269e208c --- /dev/null +++ b/internal/cache/cachedb.go @@ -0,0 +1,100 @@ +package cache + +import ( + "context" + "crypto/sha256" + "errors" + "io" + "log" + "os" + "path/filepath" + + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + + "github.com/dchest/safefile" + "github.com/gofrs/flock" + "github.com/golang/protobuf/proto" +) + +const ( + lockFilePerms = 0640 + lockFileName = "streamdb.lock" +) + +// StreamDB stores and retrieves byte streams for repository related RPCs +type StreamDB struct { + root string + lock *flock.Flock + ck CacheKeyer +} + +// OpenRepoDB will open the stream database at the specified file path. +func OpenStreamDB(root string, ck CacheKeyer) (*StreamDB, error) { + return &StreamDB{ + root: root, + ck: ck, + }, nil +} + +// ErrRepoNotFound indicates the repo namespace doesn't exist in the cache +// ErrReqNotFound indicates the request does not exist within the repo digest +var ( + ErrReqNotFound = errors.New("request digest not found within repo namespace") +) + +// GetStream will fetch the cached stream for a request. It is the +// responsibility of the caller to close the stream when done. +func (sdb *StreamDB) GetStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message) (io.ReadCloser, error) { + respPath, err := sdb.ck.CacheKeyPath(ctx, repo, req) + if err != nil { + return nil, err + } + log.Print(respPath) + + respF, err := os.Open(respPath) + if err != nil { + return nil, err + } + + return respF, nil +} + +// PutStream will store a stream in a repo-namespace keyed by the digest of the +// request protobuf message. +func (sdb *StreamDB) PutStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message, src io.Reader) error { + reqPath, err := sdb.ck.CacheKeyPath(ctx, repo, req) + if err != nil { + return err + } + + if err := os.MkdirAll(filepath.Dir(reqPath), 0755); err != nil { + return err + } + + sf, err := safefile.Create(reqPath, lockFilePerms) + if err != nil { + return err + } + defer sf.Close() + + if _, err := io.Copy(sf, src); err != nil { + return err + } + + if err := sf.Commit(); err != nil { + return err + } + + return nil +} + +// ProtoSHA256 returns a SHA256 digest of a serialized protobuf message +func ProtoSHA256(msg proto.Message) ([]byte, error) { + pb, err := proto.Marshal(msg) + if err != nil { + return nil, err + } + + sum := sha256.Sum256(pb) + return sum[:], nil +} diff --git a/internal/cache/cachedb_test.go b/internal/cache/cachedb_test.go new file mode 100644 index 000000000..f86512148 --- /dev/null +++ b/internal/cache/cachedb_test.go @@ -0,0 +1,66 @@ +package cache_test + +import ( + "context" + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/internal/cache" + "gitlab.com/gitlab-org/gitaly/internal/config" +) + +func TestStreamDBNaiveKeyer(t *testing.T) { + keyer := cache.NaiveKeyer{} + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + ctx = setMockMethodCtx(ctx, "InfoRefsUploadPack") + + db, cleanup := tempDB(t, keyer) + defer cleanup() + + req1 := &gitalypb.InfoRefsRequest{ + Repository: &gitalypb.Repository{ + RelativePath: "DEADBEEF", + StorageName: config.Config.Storages[0].Name, + }, + } + + _, err := db.GetStream(ctx, req1.Repository, req1) + require.Error(t, err, "no streams have been created yet") + + expectStream1 := "this is a very cacheable stream" + + require.NoError(t, keyer.StartCriticalSection(req1.Repository)) + require.NoError(t, db.PutStream(ctx, req1.Repository, req1, strings.NewReader(expectStream1))) + require.NoError(t, keyer.EndCriticalSection(req1.Repository)) + + stream1, err := db.GetStream(ctx, req1.Repository, req1) + require.NoError(t, err) + + out, err := ioutil.ReadAll(stream1) + require.NoError(t, err) + require.Equal(t, expectStream1, string(out)) + + // invalidate repo: + require.NoError(t, keyer.StartCriticalSection(req1.Repository)) + expectStream2 := "not what you were looking for" + require.NoError(t, db.PutStream(ctx, req1.Repository, req1, strings.NewReader(expectStream2))) + require.NoError(t, keyer.EndCriticalSection(req1.Repository)) + + stream2, err := db.GetStream(ctx, req1.Repository, req1) + require.NoError(t, err) + + out2, err := ioutil.ReadAll(stream2) + require.NoError(t, err) + require.Equal(t, expectStream2, string(out2)) +} + +func TestStreamDBMultiProc(t *testing.T) { + +} diff --git a/internal/cache/helpers_test.go b/internal/cache/helpers_test.go new file mode 100644 index 000000000..761f6f6f0 --- /dev/null +++ b/internal/cache/helpers_test.go @@ -0,0 +1,194 @@ +package cache_test + +import ( + "bytes" + "context" + "errors" + "flag" + "io" + "io/ioutil" + "log" + "os" + "os/exec" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/internal/cache" + "gitlab.com/gitlab-org/gitaly/internal/config" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + helperProcess = "GO_WANT_HELPER_PROCESS" + getOp = "get" + putOp = "put" +) + +// TestHelper allows us to create dedicated processes for testing +// our file locking strategy. The test process will accept command line args to +// determine the course of action. Depending on the operation, STDIN or STDOUT +// may be used. Upon error, exit status 1 will be returned along with error +// messages on STDERR. +// See original inspiration for this test pattern: +// https://npf.io/2015/06/testing-exec-command/ +func TestHelper(_ *testing.T) { + if os.Getenv(helperProcess) != "1" { + // wan't invoked by function execTestHelper + return + } + + var ( + fset = flag.NewFlagSet("helper", flag.ExitOnError) + opFlag = fset.String("op", "", "get|put|invalidate") + root = fset.String("root", "", "root dir for stream DBs") + repo = fset.String("repo", "", "target repo") + req = fset.String("req", "", "request to cache") + method = fset.String("method", "", "grpc method to inject into context") + ) + + for i, arg := range os.Args { + if arg == "--" { + err := fset.Parse(os.Args[i+1:]) + if err != nil { + log.Fatal(err) + } + break + } + } + + db, err := cache.OpenStreamDB(*root, cache.NaiveKeyer{}) + if err != nil { + log.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + ctx = setMockMethodCtx(ctx, *method) + + switch *opFlag { + case getOp: + stream, err := db.GetStream(ctx, mustDecodeRepo(*repo), mustDecodeReq(*req)) + if err != nil { + log.Fatal(err) + } + + _, err = io.Copy(os.Stdout, stream) + if err != nil { + log.Fatal(err) + } + case putOp: + err := db.PutStream(ctx, mustDecodeRepo(*repo), mustDecodeReq(*req), os.Stdin) + if err != nil { + log.Fatal(err) + } + default: + log.Fatalf("invalid op: %s", *opFlag) + } +} + +func mustDecodeRepo(rawPB string) *gitalypb.Repository { + repo := new(gitalypb.Repository) + err := proto.Unmarshal([]byte(rawPB), repo) + if err != nil { + log.Fatal(err) + } + return repo +} + +func mustDecodeReq(rawPB string) *gitalypb.InfoRefsRequest { + req := new(gitalypb.InfoRefsRequest) + err := proto.Unmarshal([]byte(rawPB), req) + if err != nil { + log.Fatal(err) + } + return req +} + +func execTestHelper(t testing.TB, root, method, op string, stdin io.Reader, repo *gitalypb.Repository, req *gitalypb.InfoRefsRequest) (io.Reader, <-chan error) { + args := []string{ + "-test.run=TestHelper", + "--", // no more "go test" args, now our custom args + "root=" + root, + "op=" + op, + "method=" + method, + } + + switch op { + case getOp: + fallthrough + case putOp: + rawReq, err := proto.Marshal(repo) + assert.NoError(t, err) + + rawRepo, err := proto.Marshal(repo) + assert.NoError(t, err) + + args = append(args, "req="+string(rawReq), "repo="+string(rawRepo)) + default: + assert.Fail(t, "🤠") // whoa cow poke, are you lost? + } + + cmd := exec.Command(os.Args[0], args...) + cmd.Env = []string{helperProcess + "=1"} + cmd.Stdin = stdin + + pr, pw := io.Pipe() + cmd.Stdout = pw + + stderrB := new(bytes.Buffer) + cmd.Stderr = stderrB + + errQ := make(chan error) + go func() { + if err := cmd.Run(); err != nil { + errQ <- errors.New(stderrB.String()) + } + }() + + return pr, errQ +} + +func tempDB(t testing.TB, ck cache.CacheKeyer) (*cache.StreamDB, func()) { + root, err := ioutil.TempDir("", t.Name()) + assert.NoError(t, err) + + oldCfg := config.Config + config.Config.Storages = []config.Storage{ + { + Name: "default", + Path: root, + }, + } + + cleanup := func() { + lsOutput, err := exec.Command("find", root).Output() + require.NoError(t, err) + log.Print(string(lsOutput)) + config.Config = oldCfg + require.NoError(t, os.RemoveAll(root)) + } + + db, err := cache.OpenStreamDB(root, ck) + assert.NoError(t, err) + + return db, cleanup +} + +func setMockMethodCtx(ctx context.Context, method string) context.Context { + return grpc.NewContextWithServerTransportStream(ctx, mockServerTransportStream{method}) +} + +type mockServerTransportStream struct { + method string +} + +func (msts mockServerTransportStream) Method() string { return msts.method } +func (mockServerTransportStream) SetHeader(md metadata.MD) error { return nil } +func (mockServerTransportStream) SendHeader(md metadata.MD) error { return nil } +func (mockServerTransportStream) SetTrailer(md metadata.MD) error { return nil } diff --git a/internal/cache/keyer.go b/internal/cache/keyer.go new file mode 100644 index 000000000..bb1371d89 --- /dev/null +++ b/internal/cache/keyer.go @@ -0,0 +1,176 @@ +package cache + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "log" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/dchest/safefile" + "github.com/golang/protobuf/proto" + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/tempdir" + "gitlab.com/gitlab-org/gitaly/internal/version" + "google.golang.org/grpc" +) + +const ( + // how old a cached entry is before the cleanup worker removes it + staleTimeout = time.Hour + cacheManagerVersion = 1 + genFileSuffix = ".lock" +) + +// CacheKeyer abstracts how to obtain a unique file path key for a request at a +// specific generation of the cache. The key path will magically update as new +// critical sections are declared. +type CacheKeyer interface { + // Key will return a key filepath for the provided request. If an error is + // returned, the cache should not be used. + CacheKeyPath(context.Context, *gitalypb.Repository, proto.Message) (string, error) +} + +// NaiveKeyer will try to return a key path for the current generation of +// the repo's cache. It is possible for it to return stale key paths that have +// been invalidated, but these invalid entries should only be returned within +// a narrow time window (see staleTimeout) +type NaiveKeyer struct{} + +// StartCriticalSection is a no-op for the naive strategy +// TODO: determine a safe way to signal the start of a critical section +func (NaiveKeyer) StartCriticalSection(_ *gitalypb.Repository) error { + return nil +} + +// EndCriticalSection will try to end a critical section. If a race condition +// occurs where the desired generation file already exists, an error will be +// returned. +func (NaiveKeyer) EndCriticalSection(repo *gitalypb.Repository) error { + cDir, err := cacheDir(repo) + if err != nil { + return err + } + + curGen, err := currentGeneration(cDir) + if err != nil { + return err + } + + genPath := filepath.Join(cDir, fmt.Sprintf("%x%s", curGen, genFileSuffix)) + log.Print(genPath) + + genFile, err := safefile.Create(genPath, lockFilePerms) + if err != nil { + return err + } + + if err := genFile.Commit(); err != nil { + return err + } + + return nil +} + +var ErrCtxMethodMissing = errors.New("context does not contain gRPC method name") + +// KeyPath will attempt to return the unique keypath for a request in the +// specified repo for the current generation. The context must contain the gRPC +// method in its values. +func (NaiveKeyer) CacheKeyPath(ctx context.Context, repo *gitalypb.Repository, req proto.Message) (string, error) { + cDir, err := cacheDir(repo) + if err != nil { + return "", err + } + + kName, err := keyName(ctx, cDir, req) + if err != nil { + return "", err + } + + return filepath.Join(cDir, kName), nil +} + +// cacheDir is $STORAGE/$CACHE_PREFIX/$CACHE_VERSION/$REPO_RELPATH +func cacheDir(repo *gitalypb.Repository) (string, error) { + storagePath, err := helper.GetStorageByName(repo.StorageName) + if err != nil { + return "", err + } + + absPath := filepath.Join( + storagePath, + tempdir.CachePrefix, + fmt.Sprintf("v%d", cacheManagerVersion), + repo.RelativePath, + ) + + return absPath, nil +} + +// currentGeneration will attempt to retrieve the latest generation lock +// filename available in the cacheDir +func currentGeneration(cacheDir string) (uint64, error) { + genFiles, err := filepath.Glob(filepath.Join(cacheDir, "*"+genFileSuffix)) + if err != nil { + return 0, err + } + + if len(genFiles) < 1 { + // no generation files could indicate this is the first one + return 1, nil + } + + sort.Strings(genFiles) + latestGenName := filepath.Base(genFiles[len(genFiles)-1]) + latestGenHex := strings.TrimSuffix(latestGenName, genFileSuffix) + gen, err := strconv.ParseUint(latestGenHex, 16, 64) + if err != nil { + return 0, err + } + + return gen, nil +} + +// keyName returns a filename that is a SHA256 hash sum of the composite key +// made up of the following properties: Gitaly version, gRPC method, repo cache +// generation, protobuf request +func keyName(ctx context.Context, cDir string, req proto.Message) (string, error) { + method, ok := grpc.Method(ctx) + if !ok { + return "", ErrCtxMethodMissing + } + + reqSum, err := proto.Marshal(req) + if err != nil { + return "", err + } + + curGen, err := currentGeneration(cDir) + if err != nil { + return "", err + } + + h := sha256.New() + + for _, i := range [][]byte{ + []byte(version.GetVersion()), + []byte(method), + []byte(strconv.FormatUint(curGen, 16)), + reqSum, + } { + _, err := h.Write(i) + if err != nil { + return "", err + } + } + + return hex.EncodeToString(h.Sum(nil)), nil +} diff --git a/internal/tempdir/tempdir.go b/internal/tempdir/tempdir.go index faa0bc0b0..0c0f902bd 100644 --- a/internal/tempdir/tempdir.go +++ b/internal/tempdir/tempdir.go @@ -27,6 +27,10 @@ const ( // directories. TmpRootPrefix = GitalyDataPrefix + "/tmp" + // CachePrefix is the directory where we store ephemeral disk-based caches + // that can be removed at anytime. + CachePrefix = GitalyDataPrefix + "/cache" + // MaxAge is used by ForDeleteAllRepositories. It is also a fallback // for the context-scoped temporary directories, to ensure they get // cleaned up if the cleanup at the end of the context failed to run. |