Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-05-30 22:35:56 +0300
committerPaul Okstad <pokstad@gitlab.com>2019-06-12 10:04:05 +0300
commit122f867b04b24cb3fdfd2e5c2eb7d2662433919c (patch)
treef6e34929025316a9b9782c3d705db6f5033c90e7
parent5ae2d919b4132890011890679c0005be99ea3387 (diff)
Disk based cache for info-refspo-disk-cache
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--internal/cache/cachedb.go100
-rw-r--r--internal/cache/cachedb_test.go66
-rw-r--r--internal/cache/helpers_test.go194
-rw-r--r--internal/cache/keyer.go176
-rw-r--r--internal/tempdir/tempdir.go4
7 files changed, 546 insertions, 0 deletions
diff --git a/go.mod b/go.mod
index 0daf1038a..2124c69d3 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,9 @@ module gitlab.com/gitlab-org/gitaly
require (
github.com/BurntSushi/toml v0.3.1
github.com/cloudflare/tableflip v0.0.0-20190329062924-8392f1641731
+ github.com/dchest/safefile v0.0.0-20151022103144-855e8d98f185
github.com/getsentry/raven-go v0.1.2
+ github.com/gofrs/flock v0.7.1
github.com/golang/protobuf v1.3.1
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
diff --git a/go.sum b/go.sum
index cc56b0adb..c143d2c04 100644
--- a/go.sum
+++ b/go.sum
@@ -20,6 +20,8 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
+github.com/dchest/safefile v0.0.0-20151022103144-855e8d98f185 h1:3T8ZyTDp5QxTx3NU48JVb2u+75xc040fofcBaN+6jPA=
+github.com/dchest/safefile v0.0.0-20151022103144-855e8d98f185/go.mod h1:cFRxtTwTOJkz2x3rQUNCYKWC93yP1VKjR8NUhqFxZNU=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/getsentry/raven-go v0.1.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
@@ -29,6 +31,8 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
+github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
diff --git a/internal/cache/cachedb.go b/internal/cache/cachedb.go
new file mode 100644
index 000000000..2269e208c
--- /dev/null
+++ b/internal/cache/cachedb.go
@@ -0,0 +1,100 @@
+package cache
+
+import (
+ "context"
+ "crypto/sha256"
+ "errors"
+ "io"
+ "log"
+ "os"
+ "path/filepath"
+
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+
+ "github.com/dchest/safefile"
+ "github.com/gofrs/flock"
+ "github.com/golang/protobuf/proto"
+)
+
+const (
+ lockFilePerms = 0640
+ lockFileName = "streamdb.lock"
+)
+
+// StreamDB stores and retrieves byte streams for repository related RPCs
+type StreamDB struct {
+ root string
+ lock *flock.Flock
+ ck CacheKeyer
+}
+
+// OpenRepoDB will open the stream database at the specified file path.
+func OpenStreamDB(root string, ck CacheKeyer) (*StreamDB, error) {
+ return &StreamDB{
+ root: root,
+ ck: ck,
+ }, nil
+}
+
+// ErrRepoNotFound indicates the repo namespace doesn't exist in the cache
+// ErrReqNotFound indicates the request does not exist within the repo digest
+var (
+ ErrReqNotFound = errors.New("request digest not found within repo namespace")
+)
+
+// GetStream will fetch the cached stream for a request. It is the
+// responsibility of the caller to close the stream when done.
+func (sdb *StreamDB) GetStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message) (io.ReadCloser, error) {
+ respPath, err := sdb.ck.CacheKeyPath(ctx, repo, req)
+ if err != nil {
+ return nil, err
+ }
+ log.Print(respPath)
+
+ respF, err := os.Open(respPath)
+ if err != nil {
+ return nil, err
+ }
+
+ return respF, nil
+}
+
+// PutStream will store a stream in a repo-namespace keyed by the digest of the
+// request protobuf message.
+func (sdb *StreamDB) PutStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message, src io.Reader) error {
+ reqPath, err := sdb.ck.CacheKeyPath(ctx, repo, req)
+ if err != nil {
+ return err
+ }
+
+ if err := os.MkdirAll(filepath.Dir(reqPath), 0755); err != nil {
+ return err
+ }
+
+ sf, err := safefile.Create(reqPath, lockFilePerms)
+ if err != nil {
+ return err
+ }
+ defer sf.Close()
+
+ if _, err := io.Copy(sf, src); err != nil {
+ return err
+ }
+
+ if err := sf.Commit(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// ProtoSHA256 returns a SHA256 digest of a serialized protobuf message
+func ProtoSHA256(msg proto.Message) ([]byte, error) {
+ pb, err := proto.Marshal(msg)
+ if err != nil {
+ return nil, err
+ }
+
+ sum := sha256.Sum256(pb)
+ return sum[:], nil
+}
diff --git a/internal/cache/cachedb_test.go b/internal/cache/cachedb_test.go
new file mode 100644
index 000000000..f86512148
--- /dev/null
+++ b/internal/cache/cachedb_test.go
@@ -0,0 +1,66 @@
+package cache_test
+
+import (
+ "context"
+ "io/ioutil"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
+)
+
+func TestStreamDBNaiveKeyer(t *testing.T) {
+ keyer := cache.NaiveKeyer{}
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+
+ ctx = setMockMethodCtx(ctx, "InfoRefsUploadPack")
+
+ db, cleanup := tempDB(t, keyer)
+ defer cleanup()
+
+ req1 := &gitalypb.InfoRefsRequest{
+ Repository: &gitalypb.Repository{
+ RelativePath: "DEADBEEF",
+ StorageName: config.Config.Storages[0].Name,
+ },
+ }
+
+ _, err := db.GetStream(ctx, req1.Repository, req1)
+ require.Error(t, err, "no streams have been created yet")
+
+ expectStream1 := "this is a very cacheable stream"
+
+ require.NoError(t, keyer.StartCriticalSection(req1.Repository))
+ require.NoError(t, db.PutStream(ctx, req1.Repository, req1, strings.NewReader(expectStream1)))
+ require.NoError(t, keyer.EndCriticalSection(req1.Repository))
+
+ stream1, err := db.GetStream(ctx, req1.Repository, req1)
+ require.NoError(t, err)
+
+ out, err := ioutil.ReadAll(stream1)
+ require.NoError(t, err)
+ require.Equal(t, expectStream1, string(out))
+
+ // invalidate repo:
+ require.NoError(t, keyer.StartCriticalSection(req1.Repository))
+ expectStream2 := "not what you were looking for"
+ require.NoError(t, db.PutStream(ctx, req1.Repository, req1, strings.NewReader(expectStream2)))
+ require.NoError(t, keyer.EndCriticalSection(req1.Repository))
+
+ stream2, err := db.GetStream(ctx, req1.Repository, req1)
+ require.NoError(t, err)
+
+ out2, err := ioutil.ReadAll(stream2)
+ require.NoError(t, err)
+ require.Equal(t, expectStream2, string(out2))
+}
+
+func TestStreamDBMultiProc(t *testing.T) {
+
+}
diff --git a/internal/cache/helpers_test.go b/internal/cache/helpers_test.go
new file mode 100644
index 000000000..761f6f6f0
--- /dev/null
+++ b/internal/cache/helpers_test.go
@@ -0,0 +1,194 @@
+package cache_test
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "flag"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "os/exec"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ helperProcess = "GO_WANT_HELPER_PROCESS"
+ getOp = "get"
+ putOp = "put"
+)
+
+// TestHelper allows us to create dedicated processes for testing
+// our file locking strategy. The test process will accept command line args to
+// determine the course of action. Depending on the operation, STDIN or STDOUT
+// may be used. Upon error, exit status 1 will be returned along with error
+// messages on STDERR.
+// See original inspiration for this test pattern:
+// https://npf.io/2015/06/testing-exec-command/
+func TestHelper(_ *testing.T) {
+ if os.Getenv(helperProcess) != "1" {
+ // wan't invoked by function execTestHelper
+ return
+ }
+
+ var (
+ fset = flag.NewFlagSet("helper", flag.ExitOnError)
+ opFlag = fset.String("op", "", "get|put|invalidate")
+ root = fset.String("root", "", "root dir for stream DBs")
+ repo = fset.String("repo", "", "target repo")
+ req = fset.String("req", "", "request to cache")
+ method = fset.String("method", "", "grpc method to inject into context")
+ )
+
+ for i, arg := range os.Args {
+ if arg == "--" {
+ err := fset.Parse(os.Args[i+1:])
+ if err != nil {
+ log.Fatal(err)
+ }
+ break
+ }
+ }
+
+ db, err := cache.OpenStreamDB(*root, cache.NaiveKeyer{})
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+
+ ctx = setMockMethodCtx(ctx, *method)
+
+ switch *opFlag {
+ case getOp:
+ stream, err := db.GetStream(ctx, mustDecodeRepo(*repo), mustDecodeReq(*req))
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ _, err = io.Copy(os.Stdout, stream)
+ if err != nil {
+ log.Fatal(err)
+ }
+ case putOp:
+ err := db.PutStream(ctx, mustDecodeRepo(*repo), mustDecodeReq(*req), os.Stdin)
+ if err != nil {
+ log.Fatal(err)
+ }
+ default:
+ log.Fatalf("invalid op: %s", *opFlag)
+ }
+}
+
+func mustDecodeRepo(rawPB string) *gitalypb.Repository {
+ repo := new(gitalypb.Repository)
+ err := proto.Unmarshal([]byte(rawPB), repo)
+ if err != nil {
+ log.Fatal(err)
+ }
+ return repo
+}
+
+func mustDecodeReq(rawPB string) *gitalypb.InfoRefsRequest {
+ req := new(gitalypb.InfoRefsRequest)
+ err := proto.Unmarshal([]byte(rawPB), req)
+ if err != nil {
+ log.Fatal(err)
+ }
+ return req
+}
+
+func execTestHelper(t testing.TB, root, method, op string, stdin io.Reader, repo *gitalypb.Repository, req *gitalypb.InfoRefsRequest) (io.Reader, <-chan error) {
+ args := []string{
+ "-test.run=TestHelper",
+ "--", // no more "go test" args, now our custom args
+ "root=" + root,
+ "op=" + op,
+ "method=" + method,
+ }
+
+ switch op {
+ case getOp:
+ fallthrough
+ case putOp:
+ rawReq, err := proto.Marshal(repo)
+ assert.NoError(t, err)
+
+ rawRepo, err := proto.Marshal(repo)
+ assert.NoError(t, err)
+
+ args = append(args, "req="+string(rawReq), "repo="+string(rawRepo))
+ default:
+ assert.Fail(t, "🤠") // whoa cow poke, are you lost?
+ }
+
+ cmd := exec.Command(os.Args[0], args...)
+ cmd.Env = []string{helperProcess + "=1"}
+ cmd.Stdin = stdin
+
+ pr, pw := io.Pipe()
+ cmd.Stdout = pw
+
+ stderrB := new(bytes.Buffer)
+ cmd.Stderr = stderrB
+
+ errQ := make(chan error)
+ go func() {
+ if err := cmd.Run(); err != nil {
+ errQ <- errors.New(stderrB.String())
+ }
+ }()
+
+ return pr, errQ
+}
+
+func tempDB(t testing.TB, ck cache.CacheKeyer) (*cache.StreamDB, func()) {
+ root, err := ioutil.TempDir("", t.Name())
+ assert.NoError(t, err)
+
+ oldCfg := config.Config
+ config.Config.Storages = []config.Storage{
+ {
+ Name: "default",
+ Path: root,
+ },
+ }
+
+ cleanup := func() {
+ lsOutput, err := exec.Command("find", root).Output()
+ require.NoError(t, err)
+ log.Print(string(lsOutput))
+ config.Config = oldCfg
+ require.NoError(t, os.RemoveAll(root))
+ }
+
+ db, err := cache.OpenStreamDB(root, ck)
+ assert.NoError(t, err)
+
+ return db, cleanup
+}
+
+func setMockMethodCtx(ctx context.Context, method string) context.Context {
+ return grpc.NewContextWithServerTransportStream(ctx, mockServerTransportStream{method})
+}
+
+type mockServerTransportStream struct {
+ method string
+}
+
+func (msts mockServerTransportStream) Method() string { return msts.method }
+func (mockServerTransportStream) SetHeader(md metadata.MD) error { return nil }
+func (mockServerTransportStream) SendHeader(md metadata.MD) error { return nil }
+func (mockServerTransportStream) SetTrailer(md metadata.MD) error { return nil }
diff --git a/internal/cache/keyer.go b/internal/cache/keyer.go
new file mode 100644
index 000000000..bb1371d89
--- /dev/null
+++ b/internal/cache/keyer.go
@@ -0,0 +1,176 @@
+package cache
+
+import (
+ "context"
+ "crypto/sha256"
+ "encoding/hex"
+ "errors"
+ "fmt"
+ "log"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/dchest/safefile"
+ "github.com/golang/protobuf/proto"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/tempdir"
+ "gitlab.com/gitlab-org/gitaly/internal/version"
+ "google.golang.org/grpc"
+)
+
+const (
+ // how old a cached entry is before the cleanup worker removes it
+ staleTimeout = time.Hour
+ cacheManagerVersion = 1
+ genFileSuffix = ".lock"
+)
+
+// CacheKeyer abstracts how to obtain a unique file path key for a request at a
+// specific generation of the cache. The key path will magically update as new
+// critical sections are declared.
+type CacheKeyer interface {
+ // Key will return a key filepath for the provided request. If an error is
+ // returned, the cache should not be used.
+ CacheKeyPath(context.Context, *gitalypb.Repository, proto.Message) (string, error)
+}
+
+// NaiveKeyer will try to return a key path for the current generation of
+// the repo's cache. It is possible for it to return stale key paths that have
+// been invalidated, but these invalid entries should only be returned within
+// a narrow time window (see staleTimeout)
+type NaiveKeyer struct{}
+
+// StartCriticalSection is a no-op for the naive strategy
+// TODO: determine a safe way to signal the start of a critical section
+func (NaiveKeyer) StartCriticalSection(_ *gitalypb.Repository) error {
+ return nil
+}
+
+// EndCriticalSection will try to end a critical section. If a race condition
+// occurs where the desired generation file already exists, an error will be
+// returned.
+func (NaiveKeyer) EndCriticalSection(repo *gitalypb.Repository) error {
+ cDir, err := cacheDir(repo)
+ if err != nil {
+ return err
+ }
+
+ curGen, err := currentGeneration(cDir)
+ if err != nil {
+ return err
+ }
+
+ genPath := filepath.Join(cDir, fmt.Sprintf("%x%s", curGen, genFileSuffix))
+ log.Print(genPath)
+
+ genFile, err := safefile.Create(genPath, lockFilePerms)
+ if err != nil {
+ return err
+ }
+
+ if err := genFile.Commit(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+var ErrCtxMethodMissing = errors.New("context does not contain gRPC method name")
+
+// KeyPath will attempt to return the unique keypath for a request in the
+// specified repo for the current generation. The context must contain the gRPC
+// method in its values.
+func (NaiveKeyer) CacheKeyPath(ctx context.Context, repo *gitalypb.Repository, req proto.Message) (string, error) {
+ cDir, err := cacheDir(repo)
+ if err != nil {
+ return "", err
+ }
+
+ kName, err := keyName(ctx, cDir, req)
+ if err != nil {
+ return "", err
+ }
+
+ return filepath.Join(cDir, kName), nil
+}
+
+// cacheDir is $STORAGE/$CACHE_PREFIX/$CACHE_VERSION/$REPO_RELPATH
+func cacheDir(repo *gitalypb.Repository) (string, error) {
+ storagePath, err := helper.GetStorageByName(repo.StorageName)
+ if err != nil {
+ return "", err
+ }
+
+ absPath := filepath.Join(
+ storagePath,
+ tempdir.CachePrefix,
+ fmt.Sprintf("v%d", cacheManagerVersion),
+ repo.RelativePath,
+ )
+
+ return absPath, nil
+}
+
+// currentGeneration will attempt to retrieve the latest generation lock
+// filename available in the cacheDir
+func currentGeneration(cacheDir string) (uint64, error) {
+ genFiles, err := filepath.Glob(filepath.Join(cacheDir, "*"+genFileSuffix))
+ if err != nil {
+ return 0, err
+ }
+
+ if len(genFiles) < 1 {
+ // no generation files could indicate this is the first one
+ return 1, nil
+ }
+
+ sort.Strings(genFiles)
+ latestGenName := filepath.Base(genFiles[len(genFiles)-1])
+ latestGenHex := strings.TrimSuffix(latestGenName, genFileSuffix)
+ gen, err := strconv.ParseUint(latestGenHex, 16, 64)
+ if err != nil {
+ return 0, err
+ }
+
+ return gen, nil
+}
+
+// keyName returns a filename that is a SHA256 hash sum of the composite key
+// made up of the following properties: Gitaly version, gRPC method, repo cache
+// generation, protobuf request
+func keyName(ctx context.Context, cDir string, req proto.Message) (string, error) {
+ method, ok := grpc.Method(ctx)
+ if !ok {
+ return "", ErrCtxMethodMissing
+ }
+
+ reqSum, err := proto.Marshal(req)
+ if err != nil {
+ return "", err
+ }
+
+ curGen, err := currentGeneration(cDir)
+ if err != nil {
+ return "", err
+ }
+
+ h := sha256.New()
+
+ for _, i := range [][]byte{
+ []byte(version.GetVersion()),
+ []byte(method),
+ []byte(strconv.FormatUint(curGen, 16)),
+ reqSum,
+ } {
+ _, err := h.Write(i)
+ if err != nil {
+ return "", err
+ }
+ }
+
+ return hex.EncodeToString(h.Sum(nil)), nil
+}
diff --git a/internal/tempdir/tempdir.go b/internal/tempdir/tempdir.go
index faa0bc0b0..0c0f902bd 100644
--- a/internal/tempdir/tempdir.go
+++ b/internal/tempdir/tempdir.go
@@ -27,6 +27,10 @@ const (
// directories.
TmpRootPrefix = GitalyDataPrefix + "/tmp"
+ // CachePrefix is the directory where we store ephemeral disk-based caches
+ // that can be removed at anytime.
+ CachePrefix = GitalyDataPrefix + "/cache"
+
// MaxAge is used by ForDeleteAllRepositories. It is also a fallback
// for the context-scoped temporary directories, to ensure they get
// cleaned up if the cleanup at the end of the context failed to run.