Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2019-08-20 19:12:20 +0300
committerJacob Vosmaer <jacob@gitlab.com>2019-08-20 19:12:20 +0300
commit66ab3b2a60e9cd1802e8ae4664934f551d0ff750 (patch)
tree705ca5f7fed8be71602b7408732f5f4cd904e5a6
parent94bafde880fad09b849c8ac4b73ca20acf840837 (diff)
parente9371d862cc100fd21546ab6a5cd65683b03d030 (diff)
Merge branch 'po-cache-inforefs' into 'master'
InfoRefsUploadPack disk cache See merge request gitlab-org/gitaly!1366
-rw-r--r--internal/cache/cachedb_test.go17
-rw-r--r--internal/service/smarthttp/cache.go104
-rw-r--r--internal/service/smarthttp/inforefs.go12
-rw-r--r--internal/service/smarthttp/inforefs_test.go131
-rw-r--r--internal/testhelper/grpc.go25
-rw-r--r--internal/testhelper/grpc_test.go19
6 files changed, 286 insertions, 22 deletions
diff --git a/internal/cache/cachedb_test.go b/internal/cache/cachedb_test.go
index 71e29f01c..1f006aedb 100644
--- a/internal/cache/cachedb_test.go
+++ b/internal/cache/cachedb_test.go
@@ -11,8 +11,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/cache"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/metadata"
)
func TestStreamDBNaiveKeyer(t *testing.T) {
@@ -21,7 +19,7 @@ func TestStreamDBNaiveKeyer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
- ctx = setMockMethodCtx(ctx, "InfoRefsUploadPack")
+ ctx = testhelper.SetCtxGrpcMethod(ctx, "InfoRefsUploadPack")
testRepo1, _, cleanup1 := testhelper.NewTestRepo(t)
defer cleanup1()
@@ -104,16 +102,3 @@ func TestStreamDBNaiveKeyer(t *testing.T) {
require.NoError(t, repo1Lease.EndLease(ctx))
expectGetMiss(req1)
}
-
-func setMockMethodCtx(ctx context.Context, method string) context.Context {
- return grpc.NewContextWithServerTransportStream(ctx, mockServerTransportStream{method})
-}
-
-type mockServerTransportStream struct {
- method string
-}
-
-func (msts mockServerTransportStream) Method() string { return msts.method }
-func (mockServerTransportStream) SetHeader(md metadata.MD) error { return nil }
-func (mockServerTransportStream) SendHeader(md metadata.MD) error { return nil }
-func (mockServerTransportStream) SetTrailer(md metadata.MD) error { return nil }
diff --git a/internal/service/smarthttp/cache.go b/internal/service/smarthttp/cache.go
new file mode 100644
index 000000000..8cb864bdd
--- /dev/null
+++ b/internal/service/smarthttp/cache.go
@@ -0,0 +1,104 @@
+package smarthttp
+
+import (
+ "context"
+ "io"
+ "sync"
+
+ grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
+ "github.com/prometheus/client_golang/prometheus"
+ log "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+var (
+ infoRefCache = cache.NewStreamDB(cache.LeaseKeyer{})
+
+ // prometheus counters
+ cacheAttemptTotal = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitaly_inforef_cache_attempt_total",
+ Help: "Total number of smarthttp info-ref RPCs accessing the cache",
+ },
+ )
+ hitMissTotals = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_inforef_cache_hit_miss_total",
+ Help: "Total number of smarthttp info-ref RPCs accessing the cache",
+ },
+ []string{"type"},
+ )
+
+ // counter functions are package vars to enable easy overriding for tests
+ countAttempt = func() { cacheAttemptTotal.Inc() }
+ countHit = func() { hitMissTotals.WithLabelValues("hit").Inc() }
+ countMiss = func() { hitMissTotals.WithLabelValues("miss").Inc() }
+ countErr = func() { hitMissTotals.WithLabelValues("err").Inc() }
+)
+
+func init() {
+ prometheus.MustRegister(cacheAttemptTotal)
+ prometheus.MustRegister(hitMissTotals)
+}
+
+// UploadPackCacheFeatureFlagKey enables cache usage in InfoRefsUploadPack RPC
+const UploadPackCacheFeatureFlagKey = "inforef-uploadpack-cache"
+
+func tryCache(ctx context.Context, in *gitalypb.InfoRefsRequest, w io.Writer, missFn func(io.Writer) error) error {
+ if !featureflag.IsEnabled(ctx, UploadPackCacheFeatureFlagKey) ||
+ len(in.GetGitConfigOptions()) > 0 ||
+ len(in.GetGitProtocol()) > 0 {
+ return missFn(w)
+ }
+
+ logger := grpc_logrus.Extract(ctx).WithFields(log.Fields{"service": uploadPackSvc})
+ logger.Debug("Attempting to fetch cached response")
+ countAttempt()
+
+ stream, err := infoRefCache.GetStream(ctx, in.GetRepository(), in)
+ switch err {
+
+ case nil:
+ countHit()
+ logger.Info("cache hit for UploadPack response")
+
+ if _, err := io.Copy(w, stream); err != nil {
+ return status.Errorf(codes.Internal, "GetInfoRefs: cache copy: %v", err)
+ }
+
+ return nil
+
+ case cache.ErrReqNotFound:
+ countMiss()
+ logger.Info("cache miss for UploadPack response")
+
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ pr, pw := io.Pipe()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ tr := io.TeeReader(pr, w)
+ if err := infoRefCache.PutStream(ctx, in.Repository, in, tr); err != nil {
+ logger.Errorf("unable to store info-ref response in cache: %q", err)
+ }
+ }()
+
+ err = missFn(pw)
+ _ = pw.CloseWithError(err) // always returns nil
+ return err
+
+ default:
+ countErr()
+ logger.Infof("unable to fetch cached response: %q", err)
+
+ return missFn(w)
+ }
+}
diff --git a/internal/service/smarthttp/inforefs.go b/internal/service/smarthttp/inforefs.go
index 0070173ae..bff368250 100644
--- a/internal/service/smarthttp/inforefs.go
+++ b/internal/service/smarthttp/inforefs.go
@@ -16,18 +16,26 @@ import (
"google.golang.org/grpc/status"
)
+const (
+ uploadPackSvc = "upload-pack"
+ receivePackSvc = "receive-pack"
+)
+
func (s *server) InfoRefsUploadPack(in *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error {
w := streamio.NewWriter(func(p []byte) error {
return stream.Send(&gitalypb.InfoRefsResponse{Data: p})
})
- return handleInfoRefs(stream.Context(), "upload-pack", in, w)
+
+ return tryCache(stream.Context(), in, w, func(w io.Writer) error {
+ return handleInfoRefs(stream.Context(), uploadPackSvc, in, w)
+ })
}
func (s *server) InfoRefsReceivePack(in *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsReceivePackServer) error {
w := streamio.NewWriter(func(p []byte) error {
return stream.Send(&gitalypb.InfoRefsResponse{Data: p})
})
- return handleInfoRefs(stream.Context(), "receive-pack", in, w)
+ return handleInfoRefs(stream.Context(), receivePackSvc, in, w)
}
func handleInfoRefs(ctx context.Context, service string, req *gitalypb.InfoRefsRequest, w io.Writer) error {
diff --git a/internal/service/smarthttp/inforefs_test.go b/internal/service/smarthttp/inforefs_test.go
index 945f4b2f2..0237a6f32 100644
--- a/internal/service/smarthttp/inforefs_test.go
+++ b/internal/service/smarthttp/inforefs_test.go
@@ -5,16 +5,24 @@ import (
"fmt"
"io"
"io/ioutil"
+ "os"
+ "path/filepath"
"strings"
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/objectpool"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/internal/tempdir"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/streamio"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
)
func TestSuccessfulInfoRefsUploadPack(t *testing.T) {
@@ -26,7 +34,7 @@ func TestSuccessfulInfoRefsUploadPack(t *testing.T) {
rpcRequest := &gitalypb.InfoRefsRequest{Repository: testRepo}
- response, err := makeInfoRefsUploadPackRequest(t, serverSocketPath, rpcRequest)
+ response, err := makeInfoRefsUploadPackRequest(context.Background(), t, serverSocketPath, rpcRequest)
require.NoError(t, err)
assertGitRefAdvertisement(t, "InfoRefsUploadPack", string(response), "001e# service=git-upload-pack", "0000", []string{
"003ef4e6814c3e4e7a0de82a9e7cd20c626cc963a2f8 refs/tags/v1.0.0",
@@ -48,7 +56,7 @@ func TestSuccessfulInfoRefsUploadPackWithGitConfigOptions(t *testing.T) {
GitConfigOptions: []string{"transfer.hideRefs=refs"},
}
- response, err := makeInfoRefsUploadPackRequest(t, serverSocketPath, rpcRequest)
+ response, err := makeInfoRefsUploadPackRequest(context.Background(), t, serverSocketPath, rpcRequest)
require.NoError(t, err)
assertGitRefAdvertisement(t, "InfoRefsUploadPack", string(response), "001e# service=git-upload-pack", "0000", []string{})
}
@@ -90,11 +98,11 @@ func TestSuccessfulInfoRefsUploadPackWithGitProtocol(t *testing.T) {
require.Equal(t, fmt.Sprintf("GIT_PROTOCOL=%s\n", git.ProtocolV2), envData)
}
-func makeInfoRefsUploadPackRequest(t *testing.T, serverSocketPath string, rpcRequest *gitalypb.InfoRefsRequest) ([]byte, error) {
+func makeInfoRefsUploadPackRequest(ctx context.Context, t *testing.T, serverSocketPath string, rpcRequest *gitalypb.InfoRefsRequest) ([]byte, error) {
client, conn := newSmartHTTPClient(t, serverSocketPath)
defer conn.Close()
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, err := client.InfoRefsUploadPack(ctx, rpcRequest)
require.NoError(t, err)
@@ -238,3 +246,118 @@ func assertGitRefAdvertisement(t *testing.T, rpc, responseBody string, firstLine
}
}
}
+
+func TestCacheInfoRefsUploadPack(t *testing.T) {
+ clearCache(t)
+
+ server, serverSocketPath := runSmartHTTPServer(t)
+ defer server.Stop()
+
+ testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ rpcRequest := &gitalypb.InfoRefsRequest{Repository: testRepo}
+
+ ctx := context.Background()
+
+ assertNormalResponse := func() {
+ response, err := makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, rpcRequest)
+ require.NoError(t, err)
+
+ assertGitRefAdvertisement(t, "InfoRefsUploadPack", string(response),
+ "001e# service=git-upload-pack", "0000",
+ []string{
+ "003ef4e6814c3e4e7a0de82a9e7cd20c626cc963a2f8 refs/tags/v1.0.0",
+ "00416f6d7e7ed97bb5f0054f2b1df789b39ca89b6ff9 refs/tags/v1.0.0^{}",
+ },
+ )
+ }
+
+ // if feature-flag is disabled, we should not find a cached response
+ assertNormalResponse()
+ testhelper.AssertFileNotExists(t, pathToCachedResponse(t, rpcRequest))
+
+ // enable feature flag, and we expect to find the cached response
+ ctx = enableCacheFeatureFlag(ctx)
+ assertNormalResponse()
+ require.FileExists(t, pathToCachedResponse(t, rpcRequest))
+
+ replacedContents := []string{
+ "first line",
+ "meow meow meow meow",
+ "woof woof woof woof",
+ "last line",
+ }
+
+ // replace cached response file to prove the info-ref uses the cache
+ replaceCachedResponse(t, rpcRequest, strings.Join(replacedContents, "\n"))
+ response, err := makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, rpcRequest)
+ require.NoError(t, err)
+ assertGitRefAdvertisement(t, "InfoRefsUploadPack", string(response),
+ replacedContents[0], replacedContents[3], replacedContents[1:3],
+ )
+
+ // disable feature-flag to show replaced response no longer used
+ ctx = context.Background()
+ assertNormalResponse()
+
+ // invalidate cache for repository
+ ender, err := cache.LeaseKeyer{}.StartLease(rpcRequest.Repository)
+ require.NoError(t, err)
+ require.NoError(t, ender.EndLease(setInfoRefsUploadPackMethod(context.Background())))
+
+ // replaced cache response is no longer valid
+ ctx = enableCacheFeatureFlag(ctx)
+ assertNormalResponse()
+
+ // failed requests should not cache response
+ invalidReq := &gitalypb.InfoRefsRequest{
+ Repository: &gitalypb.Repository{
+ RelativePath: "fake_repo",
+ StorageName: testRepo.StorageName,
+ },
+ } // invalid request because repo is empty
+ invalidRepoCleanup := createInvalidRepo(t, invalidReq.Repository)
+ defer invalidRepoCleanup()
+
+ _, err = makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, invalidReq)
+ testhelper.RequireGrpcError(t, err, codes.Internal)
+ testhelper.AssertFileNotExists(t, pathToCachedResponse(t, invalidReq))
+}
+
+func createInvalidRepo(t testing.TB, repo *gitalypb.Repository) func() {
+ repoDir, err := helper.GetPath(repo)
+ require.NoError(t, err)
+ for _, subDir := range []string{"objects", "refs", "HEAD"} {
+ require.NoError(t, os.MkdirAll(filepath.Join(repoDir, subDir), 0755))
+ }
+ return func() { require.NoError(t, os.RemoveAll(repoDir)) }
+}
+
+func replaceCachedResponse(t testing.TB, req *gitalypb.InfoRefsRequest, newContents string) {
+ path := pathToCachedResponse(t, req)
+ require.NoError(t, ioutil.WriteFile(path, []byte(newContents), 0644))
+}
+
+func enableCacheFeatureFlag(ctx context.Context) context.Context {
+ return metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
+ featureflag.HeaderKey(UploadPackCacheFeatureFlagKey): "true",
+ }))
+}
+
+func clearCache(t testing.TB) {
+ for _, storage := range config.Config.Storages {
+ require.NoError(t, os.RemoveAll(tempdir.CacheDir(storage)))
+ }
+}
+
+func setInfoRefsUploadPackMethod(ctx context.Context) context.Context {
+ return testhelper.SetCtxGrpcMethod(ctx, "/gitaly.SmartHTTPService/InfoRefsUploadPack")
+}
+
+func pathToCachedResponse(t testing.TB, req *gitalypb.InfoRefsRequest) string {
+ ctx := setInfoRefsUploadPackMethod(context.Background())
+ path, err := cache.LeaseKeyer{}.KeyPath(ctx, req.GetRepository(), req)
+ require.NoError(t, err)
+ return path
+}
diff --git a/internal/testhelper/grpc.go b/internal/testhelper/grpc.go
new file mode 100644
index 000000000..dc28cc37b
--- /dev/null
+++ b/internal/testhelper/grpc.go
@@ -0,0 +1,25 @@
+package testhelper
+
+import (
+ "context"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+// SetCtxGrpcMethod will set the gRPC context value for the proper key
+// responsible for an RPC full method name. This directly corresponds to the
+// gRPC function responsible for extracting the method:
+// https://godoc.org/google.golang.org/grpc#Method
+func SetCtxGrpcMethod(ctx context.Context, method string) context.Context {
+ return grpc.NewContextWithServerTransportStream(ctx, mockServerTransportStream{method})
+}
+
+type mockServerTransportStream struct {
+ method string
+}
+
+func (msts mockServerTransportStream) Method() string { return msts.method }
+func (mockServerTransportStream) SetHeader(md metadata.MD) error { return nil }
+func (mockServerTransportStream) SendHeader(md metadata.MD) error { return nil }
+func (mockServerTransportStream) SetTrailer(md metadata.MD) error { return nil }
diff --git a/internal/testhelper/grpc_test.go b/internal/testhelper/grpc_test.go
new file mode 100644
index 000000000..792d7d11a
--- /dev/null
+++ b/internal/testhelper/grpc_test.go
@@ -0,0 +1,19 @@
+package testhelper_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "google.golang.org/grpc"
+)
+
+func TestSetCtxGrpcMethod(t *testing.T) {
+ expectFullMethodName := "/pinkypb/TakeOverTheWorld.SNARF"
+ ctx := testhelper.SetCtxGrpcMethod(context.Background(), expectFullMethodName)
+
+ actualFullMethodName, ok := grpc.Method(ctx)
+ require.True(t, ok, "expected context to contain server transport stream")
+ require.Equal(t, expectFullMethodName, actualFullMethodName)
+}