Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-09-18 23:18:03 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2019-09-18 23:18:03 +0300
commitead52c8746e742c0fa9ed1952da7b44795bb7f05 (patch)
tree2990da9054e69ac78b217211544842d6d788be7a
parent10d40838934326be0bc68e28ea3450be2d3b039c (diff)
Fix cache invalidator for Create* RPCs and health checks
-rw-r--r--changelogs/unreleased/po-fix-cache-invalidator-no-repo.yml5
-rw-r--r--internal/cache/cachedb_test.go5
-rw-r--r--internal/cache/keyer.go10
-rw-r--r--internal/middleware/cache/cache.go11
-rw-r--r--internal/middleware/cache/cache_test.go17
-rw-r--r--internal/middleware/cache/export_test.go20
6 files changed, 58 insertions, 10 deletions
diff --git a/changelogs/unreleased/po-fix-cache-invalidator-no-repo.yml b/changelogs/unreleased/po-fix-cache-invalidator-no-repo.yml
new file mode 100644
index 000000000..fef61d144
--- /dev/null
+++ b/changelogs/unreleased/po-fix-cache-invalidator-no-repo.yml
@@ -0,0 +1,5 @@
+---
+title: Fix cache invalidator for Create* RPCs and health checks
+merge_request: 1494
+author:
+type: fixed
diff --git a/internal/cache/cachedb_test.go b/internal/cache/cachedb_test.go
index 1f006aedb..1fe267fa8 100644
--- a/internal/cache/cachedb_test.go
+++ b/internal/cache/cachedb_test.go
@@ -101,4 +101,9 @@ func TestStreamDBNaiveKeyer(t *testing.T) {
// only completing/removing the pending generation file will allow access
require.NoError(t, repo1Lease.EndLease(ctx))
expectGetMiss(req1)
+
+ // creating a lease on a repo that doesn't exist yet should succeed
+ req1.Repository.RelativePath += "-does-not-exist"
+ _, err = keyer.StartLease(req1.Repository)
+ require.NoError(t, err)
}
diff --git a/internal/cache/keyer.go b/internal/cache/keyer.go
index ab60e7f63..cdebd4fa9 100644
--- a/internal/cache/keyer.go
+++ b/internal/cache/keyer.go
@@ -76,7 +76,7 @@ func (l lease) EndLease(ctx context.Context) error {
}
func updateLatest(repo *gitalypb.Repository) (string, error) {
- repoPath, err := helper.GetRepoPath(repo)
+ repoPath, err := helper.GetPath(repo)
if err != nil {
return "", err
}
@@ -141,7 +141,7 @@ func (LeaseKeyer) KeyPath(ctx context.Context, repo *gitalypb.Repository, req pr
return "", err
}
- repoPath, err := helper.GetRepoPath(repo)
+ repoPath, err := helper.GetPath(repo)
if err != nil {
return "", err
}
@@ -190,7 +190,7 @@ func radixPath(root, key string) (string, error) {
}
func newPendingLease(repo *gitalypb.Repository) (string, error) {
- repoPath, err := helper.GetRepoPath(repo)
+ repoPath, err := helper.GetPath(repo)
if err != nil {
return "", err
}
@@ -223,7 +223,7 @@ func cacheDir(repo *gitalypb.Repository) (string, error) {
}
func currentLeases(repo *gitalypb.Repository) ([]os.FileInfo, error) {
- repoPath, err := helper.GetRepoPath(repo)
+ repoPath, err := helper.GetPath(repo)
if err != nil {
return nil, err
}
@@ -243,7 +243,7 @@ func currentLeases(repo *gitalypb.Repository) ([]os.FileInfo, error) {
}
func currentGenID(repo *gitalypb.Repository) (string, error) {
- repoPath, err := helper.GetRepoPath(repo)
+ repoPath, err := helper.GetPath(repo)
if err != nil {
return "", err
}
diff --git a/internal/middleware/cache/cache.go b/internal/middleware/cache/cache.go
index 2a65c6447..3400f9b00 100644
--- a/internal/middleware/cache/cache.go
+++ b/internal/middleware/cache/cache.go
@@ -3,6 +3,7 @@ package cache
import (
"context"
"fmt"
+ "strings"
"sync"
"github.com/golang/protobuf/proto"
@@ -32,11 +33,16 @@ func methodErrLogger(method string) func(error) {
}
}
+func shouldIgnore(fullMethod string) bool {
+ return strings.HasPrefix(fullMethod, "/grpc.health")
+}
+
// StreamInvalidator will invalidate any mutating RPC that targets a
// repository in a gRPC stream based RPC
func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- if !featureflag.IsEnabled(ss.Context(), FeatureFlag) {
+ if !featureflag.IsEnabled(ss.Context(), FeatureFlag) ||
+ shouldIgnore(info.FullMethod) {
return handler(srv, ss)
}
@@ -63,7 +69,8 @@ func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamS
// repository in a gRPC unary RPC
func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- if !featureflag.IsEnabled(ctx, FeatureFlag) {
+ if !featureflag.IsEnabled(ctx, FeatureFlag) ||
+ shouldIgnore(info.FullMethod) {
return handler(ctx, req)
}
diff --git a/internal/middleware/cache/cache_test.go b/internal/middleware/cache/cache_test.go
index 4296d2e4e..3f5f4065f 100644
--- a/internal/middleware/cache/cache_test.go
+++ b/internal/middleware/cache/cache_test.go
@@ -19,6 +19,8 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
+ "google.golang.org/grpc/health"
+ "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
)
@@ -49,7 +51,7 @@ func TestInvalidators(t *testing.T) {
svc := &testSvc{}
- cli, cleanup := newTestSvc(t, ctx, srvr, svc)
+ cli, cc, cleanup := newTestSvc(t, ctx, srvr, svc)
defer cleanup()
repo1 := &gitalypb.Repository{
@@ -118,6 +120,12 @@ func TestInvalidators(t *testing.T) {
})
require.NoError(t, err)
+ // Health checks should NOT trigger cache invalidation
+ hcr := &grpc_health_v1.HealthCheckRequest{Service: "TestService"}
+ _, err = grpc_health_v1.NewHealthClient(cc).Check(ctx, hcr)
+ require.NoError(t, err)
+ require.Equal(t, 0, cache.MethodErrCount.Method["/grpc.health.v1.Health/Check"])
+
require.Equal(t, expectedInvalidations, mCache.(*mockCache).invalidatedRepos)
require.Equal(t, expectedSvcRequests, svc.repoRequests)
require.Equal(t, 3, mCache.(*mockCache).endedLeases.count)
@@ -167,7 +175,10 @@ func streamFileDesc(t testing.TB) *descriptor.FileDescriptorProto {
return fdp
}
-func newTestSvc(t testing.TB, ctx context.Context, srvr *grpc.Server, svc testdata.TestServiceServer) (testdata.TestServiceClient, func()) {
+func newTestSvc(t testing.TB, ctx context.Context, srvr *grpc.Server, svc testdata.TestServiceServer) (testdata.TestServiceClient, *grpc.ClientConn, func()) {
+ healthSrvr := health.NewServer()
+ grpc_health_v1.RegisterHealthServer(srvr, healthSrvr)
+ healthSrvr.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_SERVING)
testdata.RegisterTestServiceServer(srvr, svc)
lis, err := net.Listen("tcp", ":0")
@@ -192,7 +203,7 @@ func newTestSvc(t testing.TB, ctx context.Context, srvr *grpc.Server, svc testda
)
require.NoError(t, err)
- return testdata.NewTestServiceClient(cc), cleanup
+ return testdata.NewTestServiceClient(cc), cc, cleanup
}
type testSvc struct {
diff --git a/internal/middleware/cache/export_test.go b/internal/middleware/cache/export_test.go
new file mode 100644
index 000000000..48c1dab84
--- /dev/null
+++ b/internal/middleware/cache/export_test.go
@@ -0,0 +1,20 @@
+package cache
+
+import "sync"
+
+var MethodErrCount = struct {
+ sync.Mutex
+ Method map[string]int
+}{
+ Method: map[string]int{},
+}
+
+func init() {
+ // override prometheus counter to detect any errors logged for a specific
+ // method
+ countMethodErr = func(method string) {
+ MethodErrCount.Lock()
+ MethodErrCount.Method[method]++
+ MethodErrCount.Unlock()
+ }
+}