diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-08-03 00:32:38 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-08-03 00:33:03 +0300 |
commit | 940331784ce6bf969be19a710fe9ab7b1bb33011 (patch) | |
tree | 6a66f3af9980d97f8fbf1fb2e18f79f6105742da | |
parent | a623e3a9027c049f0b4208002c7d06efe6ba64d0 (diff) |
Revert "New counter for method errors"
This reverts commit d491e954b5f79fbc212d3d7f8e20bce6b3e1b88c.
Revert "Consistent logging of errors"
This reverts commit a623e3a9027c049f0b4208002c7d06efe6ba64d0.
This change reverts accidentially pushed changes to master.
-rw-r--r-- | internal/middleware/cache/cache.go | 92 | ||||
-rw-r--r-- | internal/middleware/cache/prometheus.go | 52 |
2 files changed, 59 insertions, 85 deletions
diff --git a/internal/middleware/cache/cache.go b/internal/middleware/cache/cache.go index 533b6a5c7..e39806c7d 100644 --- a/internal/middleware/cache/cache.go +++ b/internal/middleware/cache/cache.go @@ -2,9 +2,11 @@ package cache import ( "context" + "fmt" "sync" "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" diskcache "gitlab.com/gitlab-org/gitaly/internal/cache" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -12,6 +14,40 @@ import ( "google.golang.org/grpc" ) +var ( + rpcTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_cacheinvalidator_rpc_total", + Help: "Total number of RPCs encountered by cache invalidator", + }, + ) + rpcOpTypes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_cacheinvalidator_optype_total", + Help: "Total number of operation types encountered by cache invalidator", + }, + []string{"type"}, + ) +) + +func init() { + prometheus.MustRegister(rpcTotal) + prometheus.MustRegister(rpcOpTypes) +} + +func countRPCType(mInfo protoregistry.MethodInfo) { + rpcTotal.Inc() + + switch mInfo.Operation { + case protoregistry.OpAccessor: + rpcOpTypes.WithLabelValues("accessor").Inc() + case protoregistry.OpMutator: + rpcOpTypes.WithLabelValues("mutator").Inc() + default: + rpcOpTypes.WithLabelValues("unknown").Inc() + } +} + // Invalidator is able to invalidate parts of the cache pertinent to a // specific repository. Before a repo mutating operation, StartLease should // be called. Once the operation is complete, the returned LeaseEnder should @@ -20,32 +56,21 @@ type Invalidator interface { StartLease(repo *gitalypb.Repository) (diskcache.LeaseEnder, error) } -type logger func(format string, args ...interface{}) - -func methodErrLogger(method string) logger { - return func(format string, args ...interface{}) { - countMethodErr(method) - logrus.WithField("full_method_name", method).Errorf(format, args...) - } -} - // StreamInvalidator will invalidate any mutating RPC that targets a // repository in a gRPC stream based RPC func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamServerInterceptor { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - errLogger := methodErrLogger(info.FullMethod) - mInfo, err := reg.LookupMethod(info.FullMethod) countRPCType(mInfo) if err != nil { - errLogger("unable to lookup method information for %+v", info) + logrus.WithField("FullMethodName", info.FullMethod).Errorf("unable to lookup method information for %+v", info) } if mInfo.Operation == protoregistry.OpAccessor { return handler(srv, ss) } - handler, callback := invalidateCache(ci, mInfo, handler, errLogger) + handler, callback := invalidateCache(ci, mInfo, handler) peeker := newStreamPeeker(ss, callback) return handler(srv, peeker) } @@ -55,13 +80,10 @@ func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamS // repository in a gRPC unary RPC func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - errLogger := methodErrLogger(info.FullMethod) - mInfo, err := reg.LookupMethod(info.FullMethod) countRPCType(mInfo) if err != nil { - errLogger(info.FullMethod, "unable to lookup method information: %q", err) - return handler(ctx, req) + logrus.WithField("full_method_name", info.FullMethod).Errorf("unable to lookup method information for %+v", info) } if mInfo.Operation == protoregistry.OpAccessor { @@ -70,27 +92,24 @@ func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnarySer pbReq, ok := req.(proto.Message) if !ok { - errLogger("expected protobuf message but got %T", req) - return handler(ctx, req) + return nil, fmt.Errorf("cache invalidation expected protobuf request, but got %T", req) } target, err := mInfo.TargetRepo(pbReq) if err != nil { - errLogger("unable to extract target repo: %q", err) - return handler(ctx, req) + return nil, err } le, err := ci.StartLease(target) if err != nil { - errLogger("unable to start lease: %q", err) - return handler(ctx, req) + return nil, err } // wrap the handler to ensure the lease is always ended return func() (resp interface{}, err error) { defer func() { if err := le.EndLease(ctx); err != nil { - errLogger("unable to end lease: %q", err) + logrus.Errorf("unable to end lease: %q", err) } }() return handler(ctx, req) @@ -98,9 +117,9 @@ func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnarySer } } -type recvMsgCallback func(interface{}, error) +type recvMsgCallback func(interface{}, error) error -func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grpc.StreamHandler, errLogger logger) (grpc.StreamHandler, recvMsgCallback) { +func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grpc.StreamHandler) (grpc.StreamHandler, recvMsgCallback) { var le struct { sync.RWMutex diskcache.LeaseEnder @@ -116,7 +135,7 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp return } if err := le.EndLease(stream.Context()); err != nil { - errLogger("unable to end lease: %q", err) + logrus.Errorf("unable to end lease: %q", err) } }() return handler(srv, stream) @@ -124,19 +143,19 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp // starts the cache lease and sets the lease ender iff the request's target // repository can be determined from the first request message - peekerCallback := func(firstReq interface{}, err error) { + peekerCallback := func(firstReq interface{}, err error) error { if err != nil { - errLogger("peeker received an error: %q", err) + return err } pbFirstReq, ok := firstReq.(proto.Message) if !ok { - errLogger("cache invalidation expected protobuf request, but got %T", firstReq) + return fmt.Errorf("cache invalidation expected protobuf request, but got %T", firstReq) } target, err := mInfo.TargetRepo(pbFirstReq) if err != nil { - errLogger("could not extract target repo: %q", err) + return err } le.Lock() @@ -144,8 +163,10 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp le.LeaseEnder, err = ci.StartLease(target) if err != nil { - errLogger("could not start lease: %q", err) + return err } + + return nil } return wrappedHandler, peekerCallback @@ -176,6 +197,11 @@ func newStreamPeeker(stream grpc.ServerStream, callback recvMsgCallback) grpc.Se // that the callback is called on the first call. func (sp *streamPeeker) RecvMsg(m interface{}) error { err := sp.ServerStream.RecvMsg(m) - sp.onFirstRecvOnce.Do(func() { sp.onFirstRecvCallback(m, err) }) + sp.onFirstRecvOnce.Do(func() { + err := sp.onFirstRecvCallback(m, err) + if err != nil { + logrus.Errorf("unable to invalidate cache: %q", err) + } + }) return err } diff --git a/internal/middleware/cache/prometheus.go b/internal/middleware/cache/prometheus.go deleted file mode 100644 index 3cabfe9b5..000000000 --- a/internal/middleware/cache/prometheus.go +++ /dev/null @@ -1,52 +0,0 @@ -package cache - -import ( - "github.com/prometheus/client_golang/prometheus" - "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" -) - -var ( - rpcTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "gitaly_cacheinvalidator_rpc_total", - Help: "Total number of RPCs encountered by cache invalidator", - }, - ) - rpcOpTypes = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gitaly_cacheinvalidator_optype_total", - Help: "Total number of operation types encountered by cache invalidator", - }, - []string{"type"}, - ) - methodErrTotals = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gitaly_cache_invalidator_error_total", - Help: "Total number of cache invalidation errors by method", - }, - []string{"method"}, - ) -) - -func init() { - prometheus.MustRegister(rpcTotal) - prometheus.MustRegister(rpcOpTypes) - prometheus.MustRegister(methodErrTotals) -} - -// countMethodErr is a package var to allow for overriding in tests -var ( - countMethodErr = func(method string) { methodErrTotals.WithLabelValues(method).Add(1) } - countRPCType = func(mInfo protoregistry.MethodInfo) { - rpcTotal.Inc() - - switch mInfo.Operation { - case protoregistry.OpAccessor: - rpcOpTypes.WithLabelValues("accessor").Inc() - case protoregistry.OpMutator: - rpcOpTypes.WithLabelValues("mutator").Inc() - default: - rpcOpTypes.WithLabelValues("unknown").Inc() - } - } -) |