Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2019-08-12 14:24:27 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2019-08-12 14:24:27 +0300
commit1df5f4fffb8a1c153a7220d8f2f74036cb5cf665 (patch)
treee0e3072754b4fb3b703ee4baa296c3436bdf1914
parent0a7a664ae2db1d51d1ad010a80e8024b3ea3c023 (diff)
parentf6b50a2eca4b58e923bc796fe0f691982bc5fadf (diff)
Merge branch 'po-enable-cache-invalidators' into 'master'
Enable disk cache invalidator gRPC interceptors See merge request gitlab-org/gitaly!1373
-rw-r--r--changelogs/unreleased/po-enable-cache-invalidators.yml5
-rw-r--r--internal/middleware/cache/cache.go110
-rw-r--r--internal/middleware/cache/cache_test.go10
-rw-r--r--internal/middleware/cache/prometheus.go52
-rw-r--r--internal/praefect/protoregistry/protoregistry.go14
-rw-r--r--internal/server/server.go5
6 files changed, 135 insertions, 61 deletions
diff --git a/changelogs/unreleased/po-enable-cache-invalidators.yml b/changelogs/unreleased/po-enable-cache-invalidators.yml
new file mode 100644
index 000000000..421dea90d
--- /dev/null
+++ b/changelogs/unreleased/po-enable-cache-invalidators.yml
@@ -0,0 +1,5 @@
+---
+title: Enable disk cache invalidator gRPC interceptors
+merge_request: 1373
+author:
+type: performance
diff --git a/internal/middleware/cache/cache.go b/internal/middleware/cache/cache.go
index e39806c7d..2a65c6447 100644
--- a/internal/middleware/cache/cache.go
+++ b/internal/middleware/cache/cache.go
@@ -6,48 +6,14 @@ import (
"sync"
"github.com/golang/protobuf/proto"
- "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
diskcache "gitlab.com/gitlab-org/gitaly/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
)
-var (
- rpcTotal = prometheus.NewCounter(
- prometheus.CounterOpts{
- Name: "gitaly_cacheinvalidator_rpc_total",
- Help: "Total number of RPCs encountered by cache invalidator",
- },
- )
- rpcOpTypes = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "gitaly_cacheinvalidator_optype_total",
- Help: "Total number of operation types encountered by cache invalidator",
- },
- []string{"type"},
- )
-)
-
-func init() {
- prometheus.MustRegister(rpcTotal)
- prometheus.MustRegister(rpcOpTypes)
-}
-
-func countRPCType(mInfo protoregistry.MethodInfo) {
- rpcTotal.Inc()
-
- switch mInfo.Operation {
- case protoregistry.OpAccessor:
- rpcOpTypes.WithLabelValues("accessor").Inc()
- case protoregistry.OpMutator:
- rpcOpTypes.WithLabelValues("mutator").Inc()
- default:
- rpcOpTypes.WithLabelValues("unknown").Inc()
- }
-}
-
// Invalidator is able to invalidate parts of the cache pertinent to a
// specific repository. Before a repo mutating operation, StartLease should
// be called. Once the operation is complete, the returned LeaseEnder should
@@ -56,21 +22,38 @@ type Invalidator interface {
StartLease(repo *gitalypb.Repository) (diskcache.LeaseEnder, error)
}
+// FeatureFlag enables the cache invalidator
+const FeatureFlag = "cache-invalidator"
+
+func methodErrLogger(method string) func(error) {
+ return func(err error) {
+ countMethodErr(method)
+ logrus.WithField("full_method_name", method).Error(err)
+ }
+}
+
// StreamInvalidator will invalidate any mutating RPC that targets a
// repository in a gRPC stream based RPC
func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if !featureflag.IsEnabled(ss.Context(), FeatureFlag) {
+ return handler(srv, ss)
+ }
+
+ errLogger := methodErrLogger(info.FullMethod)
+
mInfo, err := reg.LookupMethod(info.FullMethod)
countRPCType(mInfo)
if err != nil {
- logrus.WithField("FullMethodName", info.FullMethod).Errorf("unable to lookup method information for %+v", info)
+ errLogger(err)
+ return handler(srv, ss)
}
- if mInfo.Operation == protoregistry.OpAccessor {
+ if mInfo.Scope != protoregistry.ScopeRepository || mInfo.Operation == protoregistry.OpAccessor {
return handler(srv, ss)
}
- handler, callback := invalidateCache(ci, mInfo, handler)
+ handler, callback := invalidateCache(ci, mInfo, handler, errLogger)
peeker := newStreamPeeker(ss, callback)
return handler(srv, peeker)
}
@@ -80,36 +63,46 @@ func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamS
// repository in a gRPC unary RPC
func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
+ if !featureflag.IsEnabled(ctx, FeatureFlag) {
+ return handler(ctx, req)
+ }
+
+ errLogger := methodErrLogger(info.FullMethod)
+
mInfo, err := reg.LookupMethod(info.FullMethod)
countRPCType(mInfo)
if err != nil {
- logrus.WithField("full_method_name", info.FullMethod).Errorf("unable to lookup method information for %+v", info)
+ errLogger(err)
+ return handler(ctx, req)
}
- if mInfo.Operation == protoregistry.OpAccessor {
+ if mInfo.Scope != protoregistry.ScopeRepository || mInfo.Operation == protoregistry.OpAccessor {
return handler(ctx, req)
}
pbReq, ok := req.(proto.Message)
if !ok {
- return nil, fmt.Errorf("cache invalidation expected protobuf request, but got %T", req)
+ errLogger(fmt.Errorf("expected protobuf message but got %T", req))
+ return handler(ctx, req)
}
target, err := mInfo.TargetRepo(pbReq)
if err != nil {
- return nil, err
+ errLogger(err)
+ return handler(ctx, req)
}
le, err := ci.StartLease(target)
if err != nil {
- return nil, err
+ errLogger(err)
+ return handler(ctx, req)
}
// wrap the handler to ensure the lease is always ended
return func() (resp interface{}, err error) {
defer func() {
if err := le.EndLease(ctx); err != nil {
- logrus.Errorf("unable to end lease: %q", err)
+ errLogger(err)
}
}()
return handler(ctx, req)
@@ -117,9 +110,9 @@ func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnarySer
}
}
-type recvMsgCallback func(interface{}, error) error
+type recvMsgCallback func(interface{}, error)
-func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grpc.StreamHandler) (grpc.StreamHandler, recvMsgCallback) {
+func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grpc.StreamHandler, errLogger func(error)) (grpc.StreamHandler, recvMsgCallback) {
var le struct {
sync.RWMutex
diskcache.LeaseEnder
@@ -135,7 +128,7 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp
return
}
if err := le.EndLease(stream.Context()); err != nil {
- logrus.Errorf("unable to end lease: %q", err)
+ errLogger(err)
}
}()
return handler(srv, stream)
@@ -143,19 +136,22 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp
// starts the cache lease and sets the lease ender iff the request's target
// repository can be determined from the first request message
- peekerCallback := func(firstReq interface{}, err error) error {
+ peekerCallback := func(firstReq interface{}, err error) {
if err != nil {
- return err
+ errLogger(err)
+ return
}
pbFirstReq, ok := firstReq.(proto.Message)
if !ok {
- return fmt.Errorf("cache invalidation expected protobuf request, but got %T", firstReq)
+ errLogger(fmt.Errorf("cache invalidation expected protobuf request, but got %T", firstReq))
+ return
}
target, err := mInfo.TargetRepo(pbFirstReq)
if err != nil {
- return err
+ errLogger(err)
+ return
}
le.Lock()
@@ -163,10 +159,9 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp
le.LeaseEnder, err = ci.StartLease(target)
if err != nil {
- return err
+ errLogger(err)
+ return
}
-
- return nil
}
return wrappedHandler, peekerCallback
@@ -197,11 +192,6 @@ func newStreamPeeker(stream grpc.ServerStream, callback recvMsgCallback) grpc.Se
// that the callback is called on the first call.
func (sp *streamPeeker) RecvMsg(m interface{}) error {
err := sp.ServerStream.RecvMsg(m)
- sp.onFirstRecvOnce.Do(func() {
- err := sp.onFirstRecvCallback(m, err)
- if err != nil {
- logrus.Errorf("unable to invalidate cache: %q", err)
- }
- })
+ sp.onFirstRecvOnce.Do(func() { sp.onFirstRecvCallback(m, err) })
return err
}
diff --git a/internal/middleware/cache/cache_test.go b/internal/middleware/cache/cache_test.go
index dd5a1a927..4296d2e4e 100644
--- a/internal/middleware/cache/cache_test.go
+++ b/internal/middleware/cache/cache_test.go
@@ -13,11 +13,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
diskcache "gitlab.com/gitlab-org/gitaly/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/internal/middleware/cache"
"gitlab.com/gitlab-org/gitaly/internal/middleware/cache/testdata"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
)
//go:generate make testdata/stream.pb.go
@@ -43,6 +45,8 @@ func TestInvalidators(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
+ ctx = enableCache(ctx)
+
svc := &testSvc{}
cli, cleanup := newTestSvc(t, ctx, srvr, svc)
@@ -119,6 +123,12 @@ func TestInvalidators(t *testing.T) {
require.Equal(t, 3, mCache.(*mockCache).endedLeases.count)
}
+func enableCache(ctx context.Context) context.Context {
+ return metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
+ featureflag.HeaderKey(cache.FeatureFlag): "true",
+ }))
+}
+
// mockCache allows us to relay back via channel which repos are being
// invalidated in the cache
type mockCache struct {
diff --git a/internal/middleware/cache/prometheus.go b/internal/middleware/cache/prometheus.go
new file mode 100644
index 000000000..949945382
--- /dev/null
+++ b/internal/middleware/cache/prometheus.go
@@ -0,0 +1,52 @@
+package cache
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+)
+
+var (
+ rpcTotal = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitaly_cacheinvalidator_rpc_total",
+ Help: "Total number of RPCs encountered by cache invalidator",
+ },
+ )
+ rpcOpTypes = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_cacheinvalidator_optype_total",
+ Help: "Total number of operation types encountered by cache invalidator",
+ },
+ []string{"type"},
+ )
+ methodErrTotals = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_cacheinvalidator_error_total",
+ Help: "Total number of cache invalidation errors by method",
+ },
+ []string{"method"},
+ )
+)
+
+func init() {
+ prometheus.MustRegister(rpcTotal)
+ prometheus.MustRegister(rpcOpTypes)
+ prometheus.MustRegister(methodErrTotals)
+}
+
+// counter functions are package vars to allow for overriding in tests
+var (
+ countMethodErr = func(method string) { methodErrTotals.WithLabelValues(method).Inc() }
+ countRPCType = func(mInfo protoregistry.MethodInfo) {
+ rpcTotal.Inc()
+
+ switch mInfo.Operation {
+ case protoregistry.OpAccessor:
+ rpcOpTypes.WithLabelValues("accessor").Inc()
+ case protoregistry.OpMutator:
+ rpcOpTypes.WithLabelValues("mutator").Inc()
+ default:
+ rpcOpTypes.WithLabelValues("unknown").Inc()
+ }
+ }
+)
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go
index 28610ae8b..363e023be 100644
--- a/internal/praefect/protoregistry/protoregistry.go
+++ b/internal/praefect/protoregistry/protoregistry.go
@@ -17,7 +17,14 @@ import (
)
// GitalyProtoFileDescriptors is a slice of all gitaly registered file descriptors
-var GitalyProtoFileDescriptors []*descriptor.FileDescriptorProto
+var (
+ // GitalyProtoFileDescriptors is a slice of all gitaly registered file
+ // descriptors
+ GitalyProtoFileDescriptors []*descriptor.FileDescriptorProto
+ // GitalyProtoPreregistered is a proto registry pre-registered with all
+ // GitalyProtoFileDescriptors file descriptor protos
+ GitalyProtoPreregistered *Registry
+)
func init() {
for _, protoName := range gitalypb.GitalyProtos {
@@ -29,6 +36,11 @@ func init() {
GitalyProtoFileDescriptors = append(GitalyProtoFileDescriptors, fd)
}
+
+ GitalyProtoPreregistered = New()
+ if err := GitalyProtoPreregistered.RegisterFiles(GitalyProtoFileDescriptors...); err != nil {
+ panic(err)
+ }
}
// OpType represents the operation type for a RPC method
diff --git a/internal/server/server.go b/internal/server/server.go
index 32bae8d2a..7b3d340c5 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -9,15 +9,18 @@ import (
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
log "github.com/sirupsen/logrus"
+ diskcache "gitlab.com/gitlab-org/gitaly/internal/cache"
"gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
gitalylog "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/logsanitizer"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/cache"
"gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/server/auth"
"gitlab.com/gitlab-org/gitaly/internal/service"
@@ -84,6 +87,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server {
lh.StreamInterceptor(),
auth.StreamServerInterceptor(),
grpctracing.StreamServerTracingInterceptor(),
+ cache.StreamInvalidator(diskcache.LeaseKeyer{}, protoregistry.GitalyProtoPreregistered),
// Panic handler should remain last so that application panics will be
// converted to errors and logged
panichandler.StreamPanicHandler,
@@ -99,6 +103,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server {
lh.UnaryInterceptor(),
auth.UnaryServerInterceptor(),
grpctracing.UnaryServerTracingInterceptor(),
+ cache.UnaryInvalidator(diskcache.LeaseKeyer{}, protoregistry.GitalyProtoPreregistered),
// Panic handler should remain last so that application panics will be
// converted to errors and logged
panichandler.UnaryPanicHandler,