diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2019-08-20 19:12:20 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2019-08-20 19:12:20 +0300 |
commit | 66ab3b2a60e9cd1802e8ae4664934f551d0ff750 (patch) | |
tree | 705ca5f7fed8be71602b7408732f5f4cd904e5a6 | |
parent | 94bafde880fad09b849c8ac4b73ca20acf840837 (diff) | |
parent | e9371d862cc100fd21546ab6a5cd65683b03d030 (diff) |
Merge branch 'po-cache-inforefs' into 'master'
InfoRefsUploadPack disk cache
See merge request gitlab-org/gitaly!1366
-rw-r--r-- | internal/cache/cachedb_test.go | 17 | ||||
-rw-r--r-- | internal/service/smarthttp/cache.go | 104 | ||||
-rw-r--r-- | internal/service/smarthttp/inforefs.go | 12 | ||||
-rw-r--r-- | internal/service/smarthttp/inforefs_test.go | 131 | ||||
-rw-r--r-- | internal/testhelper/grpc.go | 25 | ||||
-rw-r--r-- | internal/testhelper/grpc_test.go | 19 |
6 files changed, 286 insertions, 22 deletions
diff --git a/internal/cache/cachedb_test.go b/internal/cache/cachedb_test.go index 71e29f01c..1f006aedb 100644 --- a/internal/cache/cachedb_test.go +++ b/internal/cache/cachedb_test.go @@ -11,8 +11,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/cache" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" ) func TestStreamDBNaiveKeyer(t *testing.T) { @@ -21,7 +19,7 @@ func TestStreamDBNaiveKeyer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - ctx = setMockMethodCtx(ctx, "InfoRefsUploadPack") + ctx = testhelper.SetCtxGrpcMethod(ctx, "InfoRefsUploadPack") testRepo1, _, cleanup1 := testhelper.NewTestRepo(t) defer cleanup1() @@ -104,16 +102,3 @@ func TestStreamDBNaiveKeyer(t *testing.T) { require.NoError(t, repo1Lease.EndLease(ctx)) expectGetMiss(req1) } - -func setMockMethodCtx(ctx context.Context, method string) context.Context { - return grpc.NewContextWithServerTransportStream(ctx, mockServerTransportStream{method}) -} - -type mockServerTransportStream struct { - method string -} - -func (msts mockServerTransportStream) Method() string { return msts.method } -func (mockServerTransportStream) SetHeader(md metadata.MD) error { return nil } -func (mockServerTransportStream) SendHeader(md metadata.MD) error { return nil } -func (mockServerTransportStream) SetTrailer(md metadata.MD) error { return nil } diff --git a/internal/service/smarthttp/cache.go b/internal/service/smarthttp/cache.go new file mode 100644 index 000000000..8cb864bdd --- /dev/null +++ b/internal/service/smarthttp/cache.go @@ -0,0 +1,104 @@ +package smarthttp + +import ( + "context" + "io" + "sync" + + grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/cache" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + infoRefCache = cache.NewStreamDB(cache.LeaseKeyer{}) + + // prometheus counters + cacheAttemptTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_inforef_cache_attempt_total", + Help: "Total number of smarthttp info-ref RPCs accessing the cache", + }, + ) + hitMissTotals = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_inforef_cache_hit_miss_total", + Help: "Total number of smarthttp info-ref RPCs accessing the cache", + }, + []string{"type"}, + ) + + // counter functions are package vars to enable easy overriding for tests + countAttempt = func() { cacheAttemptTotal.Inc() } + countHit = func() { hitMissTotals.WithLabelValues("hit").Inc() } + countMiss = func() { hitMissTotals.WithLabelValues("miss").Inc() } + countErr = func() { hitMissTotals.WithLabelValues("err").Inc() } +) + +func init() { + prometheus.MustRegister(cacheAttemptTotal) + prometheus.MustRegister(hitMissTotals) +} + +// UploadPackCacheFeatureFlagKey enables cache usage in InfoRefsUploadPack RPC +const UploadPackCacheFeatureFlagKey = "inforef-uploadpack-cache" + +func tryCache(ctx context.Context, in *gitalypb.InfoRefsRequest, w io.Writer, missFn func(io.Writer) error) error { + if !featureflag.IsEnabled(ctx, UploadPackCacheFeatureFlagKey) || + len(in.GetGitConfigOptions()) > 0 || + len(in.GetGitProtocol()) > 0 { + return missFn(w) + } + + logger := grpc_logrus.Extract(ctx).WithFields(log.Fields{"service": uploadPackSvc}) + logger.Debug("Attempting to fetch cached response") + countAttempt() + + stream, err := infoRefCache.GetStream(ctx, in.GetRepository(), in) + switch err { + + case nil: + countHit() + logger.Info("cache hit for UploadPack response") + + if _, err := io.Copy(w, stream); err != nil { + return status.Errorf(codes.Internal, "GetInfoRefs: cache copy: %v", err) + } + + return nil + + case cache.ErrReqNotFound: + countMiss() + logger.Info("cache miss for UploadPack response") + + var wg sync.WaitGroup + defer wg.Wait() + + pr, pw := io.Pipe() + + wg.Add(1) + go func() { + defer wg.Done() + + tr := io.TeeReader(pr, w) + if err := infoRefCache.PutStream(ctx, in.Repository, in, tr); err != nil { + logger.Errorf("unable to store info-ref response in cache: %q", err) + } + }() + + err = missFn(pw) + _ = pw.CloseWithError(err) // always returns nil + return err + + default: + countErr() + logger.Infof("unable to fetch cached response: %q", err) + + return missFn(w) + } +} diff --git a/internal/service/smarthttp/inforefs.go b/internal/service/smarthttp/inforefs.go index 0070173ae..bff368250 100644 --- a/internal/service/smarthttp/inforefs.go +++ b/internal/service/smarthttp/inforefs.go @@ -16,18 +16,26 @@ import ( "google.golang.org/grpc/status" ) +const ( + uploadPackSvc = "upload-pack" + receivePackSvc = "receive-pack" +) + func (s *server) InfoRefsUploadPack(in *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error { w := streamio.NewWriter(func(p []byte) error { return stream.Send(&gitalypb.InfoRefsResponse{Data: p}) }) - return handleInfoRefs(stream.Context(), "upload-pack", in, w) + + return tryCache(stream.Context(), in, w, func(w io.Writer) error { + return handleInfoRefs(stream.Context(), uploadPackSvc, in, w) + }) } func (s *server) InfoRefsReceivePack(in *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsReceivePackServer) error { w := streamio.NewWriter(func(p []byte) error { return stream.Send(&gitalypb.InfoRefsResponse{Data: p}) }) - return handleInfoRefs(stream.Context(), "receive-pack", in, w) + return handleInfoRefs(stream.Context(), receivePackSvc, in, w) } func handleInfoRefs(ctx context.Context, service string, req *gitalypb.InfoRefsRequest, w io.Writer) error { diff --git a/internal/service/smarthttp/inforefs_test.go b/internal/service/smarthttp/inforefs_test.go index 945f4b2f2..0237a6f32 100644 --- a/internal/service/smarthttp/inforefs_test.go +++ b/internal/service/smarthttp/inforefs_test.go @@ -5,16 +5,24 @@ import ( "fmt" "io" "io/ioutil" + "os" + "path/filepath" "strings" "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/cache" + "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/objectpool" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/internal/tempdir" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/streamio" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" ) func TestSuccessfulInfoRefsUploadPack(t *testing.T) { @@ -26,7 +34,7 @@ func TestSuccessfulInfoRefsUploadPack(t *testing.T) { rpcRequest := &gitalypb.InfoRefsRequest{Repository: testRepo} - response, err := makeInfoRefsUploadPackRequest(t, serverSocketPath, rpcRequest) + response, err := makeInfoRefsUploadPackRequest(context.Background(), t, serverSocketPath, rpcRequest) require.NoError(t, err) assertGitRefAdvertisement(t, "InfoRefsUploadPack", string(response), "001e# service=git-upload-pack", "0000", []string{ "003ef4e6814c3e4e7a0de82a9e7cd20c626cc963a2f8 refs/tags/v1.0.0", @@ -48,7 +56,7 @@ func TestSuccessfulInfoRefsUploadPackWithGitConfigOptions(t *testing.T) { GitConfigOptions: []string{"transfer.hideRefs=refs"}, } - response, err := makeInfoRefsUploadPackRequest(t, serverSocketPath, rpcRequest) + response, err := makeInfoRefsUploadPackRequest(context.Background(), t, serverSocketPath, rpcRequest) require.NoError(t, err) assertGitRefAdvertisement(t, "InfoRefsUploadPack", string(response), "001e# service=git-upload-pack", "0000", []string{}) } @@ -90,11 +98,11 @@ func TestSuccessfulInfoRefsUploadPackWithGitProtocol(t *testing.T) { require.Equal(t, fmt.Sprintf("GIT_PROTOCOL=%s\n", git.ProtocolV2), envData) } -func makeInfoRefsUploadPackRequest(t *testing.T, serverSocketPath string, rpcRequest *gitalypb.InfoRefsRequest) ([]byte, error) { +func makeInfoRefsUploadPackRequest(ctx context.Context, t *testing.T, serverSocketPath string, rpcRequest *gitalypb.InfoRefsRequest) ([]byte, error) { client, conn := newSmartHTTPClient(t, serverSocketPath) defer conn.Close() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() c, err := client.InfoRefsUploadPack(ctx, rpcRequest) require.NoError(t, err) @@ -238,3 +246,118 @@ func assertGitRefAdvertisement(t *testing.T, rpc, responseBody string, firstLine } } } + +func TestCacheInfoRefsUploadPack(t *testing.T) { + clearCache(t) + + server, serverSocketPath := runSmartHTTPServer(t) + defer server.Stop() + + testRepo, _, cleanupFn := testhelper.NewTestRepo(t) + defer cleanupFn() + + rpcRequest := &gitalypb.InfoRefsRequest{Repository: testRepo} + + ctx := context.Background() + + assertNormalResponse := func() { + response, err := makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, rpcRequest) + require.NoError(t, err) + + assertGitRefAdvertisement(t, "InfoRefsUploadPack", string(response), + "001e# service=git-upload-pack", "0000", + []string{ + "003ef4e6814c3e4e7a0de82a9e7cd20c626cc963a2f8 refs/tags/v1.0.0", + "00416f6d7e7ed97bb5f0054f2b1df789b39ca89b6ff9 refs/tags/v1.0.0^{}", + }, + ) + } + + // if feature-flag is disabled, we should not find a cached response + assertNormalResponse() + testhelper.AssertFileNotExists(t, pathToCachedResponse(t, rpcRequest)) + + // enable feature flag, and we expect to find the cached response + ctx = enableCacheFeatureFlag(ctx) + assertNormalResponse() + require.FileExists(t, pathToCachedResponse(t, rpcRequest)) + + replacedContents := []string{ + "first line", + "meow meow meow meow", + "woof woof woof woof", + "last line", + } + + // replace cached response file to prove the info-ref uses the cache + replaceCachedResponse(t, rpcRequest, strings.Join(replacedContents, "\n")) + response, err := makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, rpcRequest) + require.NoError(t, err) + assertGitRefAdvertisement(t, "InfoRefsUploadPack", string(response), + replacedContents[0], replacedContents[3], replacedContents[1:3], + ) + + // disable feature-flag to show replaced response no longer used + ctx = context.Background() + assertNormalResponse() + + // invalidate cache for repository + ender, err := cache.LeaseKeyer{}.StartLease(rpcRequest.Repository) + require.NoError(t, err) + require.NoError(t, ender.EndLease(setInfoRefsUploadPackMethod(context.Background()))) + + // replaced cache response is no longer valid + ctx = enableCacheFeatureFlag(ctx) + assertNormalResponse() + + // failed requests should not cache response + invalidReq := &gitalypb.InfoRefsRequest{ + Repository: &gitalypb.Repository{ + RelativePath: "fake_repo", + StorageName: testRepo.StorageName, + }, + } // invalid request because repo is empty + invalidRepoCleanup := createInvalidRepo(t, invalidReq.Repository) + defer invalidRepoCleanup() + + _, err = makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, invalidReq) + testhelper.RequireGrpcError(t, err, codes.Internal) + testhelper.AssertFileNotExists(t, pathToCachedResponse(t, invalidReq)) +} + +func createInvalidRepo(t testing.TB, repo *gitalypb.Repository) func() { + repoDir, err := helper.GetPath(repo) + require.NoError(t, err) + for _, subDir := range []string{"objects", "refs", "HEAD"} { + require.NoError(t, os.MkdirAll(filepath.Join(repoDir, subDir), 0755)) + } + return func() { require.NoError(t, os.RemoveAll(repoDir)) } +} + +func replaceCachedResponse(t testing.TB, req *gitalypb.InfoRefsRequest, newContents string) { + path := pathToCachedResponse(t, req) + require.NoError(t, ioutil.WriteFile(path, []byte(newContents), 0644)) +} + +func enableCacheFeatureFlag(ctx context.Context) context.Context { + return metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{ + featureflag.HeaderKey(UploadPackCacheFeatureFlagKey): "true", + })) +} + +func clearCache(t testing.TB) { + for _, storage := range config.Config.Storages { + require.NoError(t, os.RemoveAll(tempdir.CacheDir(storage))) + } +} + +func setInfoRefsUploadPackMethod(ctx context.Context) context.Context { + return testhelper.SetCtxGrpcMethod(ctx, "/gitaly.SmartHTTPService/InfoRefsUploadPack") +} + +func pathToCachedResponse(t testing.TB, req *gitalypb.InfoRefsRequest) string { + ctx := setInfoRefsUploadPackMethod(context.Background()) + path, err := cache.LeaseKeyer{}.KeyPath(ctx, req.GetRepository(), req) + require.NoError(t, err) + return path +} diff --git a/internal/testhelper/grpc.go b/internal/testhelper/grpc.go new file mode 100644 index 000000000..dc28cc37b --- /dev/null +++ b/internal/testhelper/grpc.go @@ -0,0 +1,25 @@ +package testhelper + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// SetCtxGrpcMethod will set the gRPC context value for the proper key +// responsible for an RPC full method name. This directly corresponds to the +// gRPC function responsible for extracting the method: +// https://godoc.org/google.golang.org/grpc#Method +func SetCtxGrpcMethod(ctx context.Context, method string) context.Context { + return grpc.NewContextWithServerTransportStream(ctx, mockServerTransportStream{method}) +} + +type mockServerTransportStream struct { + method string +} + +func (msts mockServerTransportStream) Method() string { return msts.method } +func (mockServerTransportStream) SetHeader(md metadata.MD) error { return nil } +func (mockServerTransportStream) SendHeader(md metadata.MD) error { return nil } +func (mockServerTransportStream) SetTrailer(md metadata.MD) error { return nil } diff --git a/internal/testhelper/grpc_test.go b/internal/testhelper/grpc_test.go new file mode 100644 index 000000000..792d7d11a --- /dev/null +++ b/internal/testhelper/grpc_test.go @@ -0,0 +1,19 @@ +package testhelper_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" +) + +func TestSetCtxGrpcMethod(t *testing.T) { + expectFullMethodName := "/pinkypb/TakeOverTheWorld.SNARF" + ctx := testhelper.SetCtxGrpcMethod(context.Background(), expectFullMethodName) + + actualFullMethodName, ok := grpc.Method(ctx) + require.True(t, ok, "expected context to contain server transport stream") + require.Equal(t, expectFullMethodName, actualFullMethodName) +} |