diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2019-08-12 14:24:27 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2019-08-12 14:24:27 +0300 |
commit | 1df5f4fffb8a1c153a7220d8f2f74036cb5cf665 (patch) | |
tree | e0e3072754b4fb3b703ee4baa296c3436bdf1914 | |
parent | 0a7a664ae2db1d51d1ad010a80e8024b3ea3c023 (diff) | |
parent | f6b50a2eca4b58e923bc796fe0f691982bc5fadf (diff) |
Merge branch 'po-enable-cache-invalidators' into 'master'
Enable disk cache invalidator gRPC interceptors
See merge request gitlab-org/gitaly!1373
-rw-r--r-- | changelogs/unreleased/po-enable-cache-invalidators.yml | 5 | ||||
-rw-r--r-- | internal/middleware/cache/cache.go | 110 | ||||
-rw-r--r-- | internal/middleware/cache/cache_test.go | 10 | ||||
-rw-r--r-- | internal/middleware/cache/prometheus.go | 52 | ||||
-rw-r--r-- | internal/praefect/protoregistry/protoregistry.go | 14 | ||||
-rw-r--r-- | internal/server/server.go | 5 |
6 files changed, 135 insertions, 61 deletions
diff --git a/changelogs/unreleased/po-enable-cache-invalidators.yml b/changelogs/unreleased/po-enable-cache-invalidators.yml new file mode 100644 index 000000000..421dea90d --- /dev/null +++ b/changelogs/unreleased/po-enable-cache-invalidators.yml @@ -0,0 +1,5 @@ +--- +title: Enable disk cache invalidator gRPC interceptors +merge_request: 1373 +author: +type: performance diff --git a/internal/middleware/cache/cache.go b/internal/middleware/cache/cache.go index e39806c7d..2a65c6447 100644 --- a/internal/middleware/cache/cache.go +++ b/internal/middleware/cache/cache.go @@ -6,48 +6,14 @@ import ( "sync" "github.com/golang/protobuf/proto" - "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" diskcache "gitlab.com/gitlab-org/gitaly/internal/cache" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" ) -var ( - rpcTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "gitaly_cacheinvalidator_rpc_total", - Help: "Total number of RPCs encountered by cache invalidator", - }, - ) - rpcOpTypes = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gitaly_cacheinvalidator_optype_total", - Help: "Total number of operation types encountered by cache invalidator", - }, - []string{"type"}, - ) -) - -func init() { - prometheus.MustRegister(rpcTotal) - prometheus.MustRegister(rpcOpTypes) -} - -func countRPCType(mInfo protoregistry.MethodInfo) { - rpcTotal.Inc() - - switch mInfo.Operation { - case protoregistry.OpAccessor: - rpcOpTypes.WithLabelValues("accessor").Inc() - case protoregistry.OpMutator: - rpcOpTypes.WithLabelValues("mutator").Inc() - default: - rpcOpTypes.WithLabelValues("unknown").Inc() - } -} - // Invalidator is able to invalidate parts of the cache pertinent to a // specific repository. Before a repo mutating operation, StartLease should // be called. Once the operation is complete, the returned LeaseEnder should @@ -56,21 +22,38 @@ type Invalidator interface { StartLease(repo *gitalypb.Repository) (diskcache.LeaseEnder, error) } +// FeatureFlag enables the cache invalidator +const FeatureFlag = "cache-invalidator" + +func methodErrLogger(method string) func(error) { + return func(err error) { + countMethodErr(method) + logrus.WithField("full_method_name", method).Error(err) + } +} + // StreamInvalidator will invalidate any mutating RPC that targets a // repository in a gRPC stream based RPC func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamServerInterceptor { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if !featureflag.IsEnabled(ss.Context(), FeatureFlag) { + return handler(srv, ss) + } + + errLogger := methodErrLogger(info.FullMethod) + mInfo, err := reg.LookupMethod(info.FullMethod) countRPCType(mInfo) if err != nil { - logrus.WithField("FullMethodName", info.FullMethod).Errorf("unable to lookup method information for %+v", info) + errLogger(err) + return handler(srv, ss) } - if mInfo.Operation == protoregistry.OpAccessor { + if mInfo.Scope != protoregistry.ScopeRepository || mInfo.Operation == protoregistry.OpAccessor { return handler(srv, ss) } - handler, callback := invalidateCache(ci, mInfo, handler) + handler, callback := invalidateCache(ci, mInfo, handler, errLogger) peeker := newStreamPeeker(ss, callback) return handler(srv, peeker) } @@ -80,36 +63,46 @@ func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamS // repository in a gRPC unary RPC func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + if !featureflag.IsEnabled(ctx, FeatureFlag) { + return handler(ctx, req) + } + + errLogger := methodErrLogger(info.FullMethod) + mInfo, err := reg.LookupMethod(info.FullMethod) countRPCType(mInfo) if err != nil { - logrus.WithField("full_method_name", info.FullMethod).Errorf("unable to lookup method information for %+v", info) + errLogger(err) + return handler(ctx, req) } - if mInfo.Operation == protoregistry.OpAccessor { + if mInfo.Scope != protoregistry.ScopeRepository || mInfo.Operation == protoregistry.OpAccessor { return handler(ctx, req) } pbReq, ok := req.(proto.Message) if !ok { - return nil, fmt.Errorf("cache invalidation expected protobuf request, but got %T", req) + errLogger(fmt.Errorf("expected protobuf message but got %T", req)) + return handler(ctx, req) } target, err := mInfo.TargetRepo(pbReq) if err != nil { - return nil, err + errLogger(err) + return handler(ctx, req) } le, err := ci.StartLease(target) if err != nil { - return nil, err + errLogger(err) + return handler(ctx, req) } // wrap the handler to ensure the lease is always ended return func() (resp interface{}, err error) { defer func() { if err := le.EndLease(ctx); err != nil { - logrus.Errorf("unable to end lease: %q", err) + errLogger(err) } }() return handler(ctx, req) @@ -117,9 +110,9 @@ func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnarySer } } -type recvMsgCallback func(interface{}, error) error +type recvMsgCallback func(interface{}, error) -func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grpc.StreamHandler) (grpc.StreamHandler, recvMsgCallback) { +func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grpc.StreamHandler, errLogger func(error)) (grpc.StreamHandler, recvMsgCallback) { var le struct { sync.RWMutex diskcache.LeaseEnder @@ -135,7 +128,7 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp return } if err := le.EndLease(stream.Context()); err != nil { - logrus.Errorf("unable to end lease: %q", err) + errLogger(err) } }() return handler(srv, stream) @@ -143,19 +136,22 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp // starts the cache lease and sets the lease ender iff the request's target // repository can be determined from the first request message - peekerCallback := func(firstReq interface{}, err error) error { + peekerCallback := func(firstReq interface{}, err error) { if err != nil { - return err + errLogger(err) + return } pbFirstReq, ok := firstReq.(proto.Message) if !ok { - return fmt.Errorf("cache invalidation expected protobuf request, but got %T", firstReq) + errLogger(fmt.Errorf("cache invalidation expected protobuf request, but got %T", firstReq)) + return } target, err := mInfo.TargetRepo(pbFirstReq) if err != nil { - return err + errLogger(err) + return } le.Lock() @@ -163,10 +159,9 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp le.LeaseEnder, err = ci.StartLease(target) if err != nil { - return err + errLogger(err) + return } - - return nil } return wrappedHandler, peekerCallback @@ -197,11 +192,6 @@ func newStreamPeeker(stream grpc.ServerStream, callback recvMsgCallback) grpc.Se // that the callback is called on the first call. func (sp *streamPeeker) RecvMsg(m interface{}) error { err := sp.ServerStream.RecvMsg(m) - sp.onFirstRecvOnce.Do(func() { - err := sp.onFirstRecvCallback(m, err) - if err != nil { - logrus.Errorf("unable to invalidate cache: %q", err) - } - }) + sp.onFirstRecvOnce.Do(func() { sp.onFirstRecvCallback(m, err) }) return err } diff --git a/internal/middleware/cache/cache_test.go b/internal/middleware/cache/cache_test.go index dd5a1a927..4296d2e4e 100644 --- a/internal/middleware/cache/cache_test.go +++ b/internal/middleware/cache/cache_test.go @@ -13,11 +13,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diskcache "gitlab.com/gitlab-org/gitaly/internal/cache" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/middleware/cache" "gitlab.com/gitlab-org/gitaly/internal/middleware/cache/testdata" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) //go:generate make testdata/stream.pb.go @@ -43,6 +45,8 @@ func TestInvalidators(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() + ctx = enableCache(ctx) + svc := &testSvc{} cli, cleanup := newTestSvc(t, ctx, srvr, svc) @@ -119,6 +123,12 @@ func TestInvalidators(t *testing.T) { require.Equal(t, 3, mCache.(*mockCache).endedLeases.count) } +func enableCache(ctx context.Context) context.Context { + return metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{ + featureflag.HeaderKey(cache.FeatureFlag): "true", + })) +} + // mockCache allows us to relay back via channel which repos are being // invalidated in the cache type mockCache struct { diff --git a/internal/middleware/cache/prometheus.go b/internal/middleware/cache/prometheus.go new file mode 100644 index 000000000..949945382 --- /dev/null +++ b/internal/middleware/cache/prometheus.go @@ -0,0 +1,52 @@ +package cache + +import ( + "github.com/prometheus/client_golang/prometheus" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" +) + +var ( + rpcTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_cacheinvalidator_rpc_total", + Help: "Total number of RPCs encountered by cache invalidator", + }, + ) + rpcOpTypes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_cacheinvalidator_optype_total", + Help: "Total number of operation types encountered by cache invalidator", + }, + []string{"type"}, + ) + methodErrTotals = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_cacheinvalidator_error_total", + Help: "Total number of cache invalidation errors by method", + }, + []string{"method"}, + ) +) + +func init() { + prometheus.MustRegister(rpcTotal) + prometheus.MustRegister(rpcOpTypes) + prometheus.MustRegister(methodErrTotals) +} + +// counter functions are package vars to allow for overriding in tests +var ( + countMethodErr = func(method string) { methodErrTotals.WithLabelValues(method).Inc() } + countRPCType = func(mInfo protoregistry.MethodInfo) { + rpcTotal.Inc() + + switch mInfo.Operation { + case protoregistry.OpAccessor: + rpcOpTypes.WithLabelValues("accessor").Inc() + case protoregistry.OpMutator: + rpcOpTypes.WithLabelValues("mutator").Inc() + default: + rpcOpTypes.WithLabelValues("unknown").Inc() + } + } +) diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go index 28610ae8b..363e023be 100644 --- a/internal/praefect/protoregistry/protoregistry.go +++ b/internal/praefect/protoregistry/protoregistry.go @@ -17,7 +17,14 @@ import ( ) // GitalyProtoFileDescriptors is a slice of all gitaly registered file descriptors -var GitalyProtoFileDescriptors []*descriptor.FileDescriptorProto +var ( + // GitalyProtoFileDescriptors is a slice of all gitaly registered file + // descriptors + GitalyProtoFileDescriptors []*descriptor.FileDescriptorProto + // GitalyProtoPreregistered is a proto registry pre-registered with all + // GitalyProtoFileDescriptors file descriptor protos + GitalyProtoPreregistered *Registry +) func init() { for _, protoName := range gitalypb.GitalyProtos { @@ -29,6 +36,11 @@ func init() { GitalyProtoFileDescriptors = append(GitalyProtoFileDescriptors, fd) } + + GitalyProtoPreregistered = New() + if err := GitalyProtoPreregistered.RegisterFiles(GitalyProtoFileDescriptors...); err != nil { + panic(err) + } } // OpType represents the operation type for a RPC method diff --git a/internal/server/server.go b/internal/server/server.go index 32bae8d2a..7b3d340c5 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -9,15 +9,18 @@ import ( grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" log "github.com/sirupsen/logrus" + diskcache "gitlab.com/gitlab-org/gitaly/internal/cache" "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" gitalylog "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/logsanitizer" + "gitlab.com/gitlab-org/gitaly/internal/middleware/cache" "gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/server/auth" "gitlab.com/gitlab-org/gitaly/internal/service" @@ -84,6 +87,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { lh.StreamInterceptor(), auth.StreamServerInterceptor(), grpctracing.StreamServerTracingInterceptor(), + cache.StreamInvalidator(diskcache.LeaseKeyer{}, protoregistry.GitalyProtoPreregistered), // Panic handler should remain last so that application panics will be // converted to errors and logged panichandler.StreamPanicHandler, @@ -99,6 +103,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { lh.UnaryInterceptor(), auth.UnaryServerInterceptor(), grpctracing.UnaryServerTracingInterceptor(), + cache.UnaryInvalidator(diskcache.LeaseKeyer{}, protoregistry.GitalyProtoPreregistered), // Panic handler should remain last so that application panics will be // converted to errors and logged panichandler.UnaryPanicHandler, |