Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-08-03 00:32:38 +0300
committerPaul Okstad <pokstad@gitlab.com>2019-08-03 00:33:03 +0300
commit940331784ce6bf969be19a710fe9ab7b1bb33011 (patch)
tree6a66f3af9980d97f8fbf1fb2e18f79f6105742da
parenta623e3a9027c049f0b4208002c7d06efe6ba64d0 (diff)
Revert "New counter for method errors"
This reverts commit d491e954b5f79fbc212d3d7f8e20bce6b3e1b88c. Revert "Consistent logging of errors" This reverts commit a623e3a9027c049f0b4208002c7d06efe6ba64d0. This change reverts accidentially pushed changes to master.
-rw-r--r--internal/middleware/cache/cache.go92
-rw-r--r--internal/middleware/cache/prometheus.go52
2 files changed, 59 insertions, 85 deletions
diff --git a/internal/middleware/cache/cache.go b/internal/middleware/cache/cache.go
index 533b6a5c7..e39806c7d 100644
--- a/internal/middleware/cache/cache.go
+++ b/internal/middleware/cache/cache.go
@@ -2,9 +2,11 @@ package cache
import (
"context"
+ "fmt"
"sync"
"github.com/golang/protobuf/proto"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
diskcache "gitlab.com/gitlab-org/gitaly/internal/cache"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
@@ -12,6 +14,40 @@ import (
"google.golang.org/grpc"
)
+var (
+ rpcTotal = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitaly_cacheinvalidator_rpc_total",
+ Help: "Total number of RPCs encountered by cache invalidator",
+ },
+ )
+ rpcOpTypes = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_cacheinvalidator_optype_total",
+ Help: "Total number of operation types encountered by cache invalidator",
+ },
+ []string{"type"},
+ )
+)
+
+func init() {
+ prometheus.MustRegister(rpcTotal)
+ prometheus.MustRegister(rpcOpTypes)
+}
+
+func countRPCType(mInfo protoregistry.MethodInfo) {
+ rpcTotal.Inc()
+
+ switch mInfo.Operation {
+ case protoregistry.OpAccessor:
+ rpcOpTypes.WithLabelValues("accessor").Inc()
+ case protoregistry.OpMutator:
+ rpcOpTypes.WithLabelValues("mutator").Inc()
+ default:
+ rpcOpTypes.WithLabelValues("unknown").Inc()
+ }
+}
+
// Invalidator is able to invalidate parts of the cache pertinent to a
// specific repository. Before a repo mutating operation, StartLease should
// be called. Once the operation is complete, the returned LeaseEnder should
@@ -20,32 +56,21 @@ type Invalidator interface {
StartLease(repo *gitalypb.Repository) (diskcache.LeaseEnder, error)
}
-type logger func(format string, args ...interface{})
-
-func methodErrLogger(method string) logger {
- return func(format string, args ...interface{}) {
- countMethodErr(method)
- logrus.WithField("full_method_name", method).Errorf(format, args...)
- }
-}
-
// StreamInvalidator will invalidate any mutating RPC that targets a
// repository in a gRPC stream based RPC
func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- errLogger := methodErrLogger(info.FullMethod)
-
mInfo, err := reg.LookupMethod(info.FullMethod)
countRPCType(mInfo)
if err != nil {
- errLogger("unable to lookup method information for %+v", info)
+ logrus.WithField("FullMethodName", info.FullMethod).Errorf("unable to lookup method information for %+v", info)
}
if mInfo.Operation == protoregistry.OpAccessor {
return handler(srv, ss)
}
- handler, callback := invalidateCache(ci, mInfo, handler, errLogger)
+ handler, callback := invalidateCache(ci, mInfo, handler)
peeker := newStreamPeeker(ss, callback)
return handler(srv, peeker)
}
@@ -55,13 +80,10 @@ func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamS
// repository in a gRPC unary RPC
func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- errLogger := methodErrLogger(info.FullMethod)
-
mInfo, err := reg.LookupMethod(info.FullMethod)
countRPCType(mInfo)
if err != nil {
- errLogger(info.FullMethod, "unable to lookup method information: %q", err)
- return handler(ctx, req)
+ logrus.WithField("full_method_name", info.FullMethod).Errorf("unable to lookup method information for %+v", info)
}
if mInfo.Operation == protoregistry.OpAccessor {
@@ -70,27 +92,24 @@ func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnarySer
pbReq, ok := req.(proto.Message)
if !ok {
- errLogger("expected protobuf message but got %T", req)
- return handler(ctx, req)
+ return nil, fmt.Errorf("cache invalidation expected protobuf request, but got %T", req)
}
target, err := mInfo.TargetRepo(pbReq)
if err != nil {
- errLogger("unable to extract target repo: %q", err)
- return handler(ctx, req)
+ return nil, err
}
le, err := ci.StartLease(target)
if err != nil {
- errLogger("unable to start lease: %q", err)
- return handler(ctx, req)
+ return nil, err
}
// wrap the handler to ensure the lease is always ended
return func() (resp interface{}, err error) {
defer func() {
if err := le.EndLease(ctx); err != nil {
- errLogger("unable to end lease: %q", err)
+ logrus.Errorf("unable to end lease: %q", err)
}
}()
return handler(ctx, req)
@@ -98,9 +117,9 @@ func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnarySer
}
}
-type recvMsgCallback func(interface{}, error)
+type recvMsgCallback func(interface{}, error) error
-func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grpc.StreamHandler, errLogger logger) (grpc.StreamHandler, recvMsgCallback) {
+func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grpc.StreamHandler) (grpc.StreamHandler, recvMsgCallback) {
var le struct {
sync.RWMutex
diskcache.LeaseEnder
@@ -116,7 +135,7 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp
return
}
if err := le.EndLease(stream.Context()); err != nil {
- errLogger("unable to end lease: %q", err)
+ logrus.Errorf("unable to end lease: %q", err)
}
}()
return handler(srv, stream)
@@ -124,19 +143,19 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp
// starts the cache lease and sets the lease ender iff the request's target
// repository can be determined from the first request message
- peekerCallback := func(firstReq interface{}, err error) {
+ peekerCallback := func(firstReq interface{}, err error) error {
if err != nil {
- errLogger("peeker received an error: %q", err)
+ return err
}
pbFirstReq, ok := firstReq.(proto.Message)
if !ok {
- errLogger("cache invalidation expected protobuf request, but got %T", firstReq)
+ return fmt.Errorf("cache invalidation expected protobuf request, but got %T", firstReq)
}
target, err := mInfo.TargetRepo(pbFirstReq)
if err != nil {
- errLogger("could not extract target repo: %q", err)
+ return err
}
le.Lock()
@@ -144,8 +163,10 @@ func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grp
le.LeaseEnder, err = ci.StartLease(target)
if err != nil {
- errLogger("could not start lease: %q", err)
+ return err
}
+
+ return nil
}
return wrappedHandler, peekerCallback
@@ -176,6 +197,11 @@ func newStreamPeeker(stream grpc.ServerStream, callback recvMsgCallback) grpc.Se
// that the callback is called on the first call.
func (sp *streamPeeker) RecvMsg(m interface{}) error {
err := sp.ServerStream.RecvMsg(m)
- sp.onFirstRecvOnce.Do(func() { sp.onFirstRecvCallback(m, err) })
+ sp.onFirstRecvOnce.Do(func() {
+ err := sp.onFirstRecvCallback(m, err)
+ if err != nil {
+ logrus.Errorf("unable to invalidate cache: %q", err)
+ }
+ })
return err
}
diff --git a/internal/middleware/cache/prometheus.go b/internal/middleware/cache/prometheus.go
deleted file mode 100644
index 3cabfe9b5..000000000
--- a/internal/middleware/cache/prometheus.go
+++ /dev/null
@@ -1,52 +0,0 @@
-package cache
-
-import (
- "github.com/prometheus/client_golang/prometheus"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
-)
-
-var (
- rpcTotal = prometheus.NewCounter(
- prometheus.CounterOpts{
- Name: "gitaly_cacheinvalidator_rpc_total",
- Help: "Total number of RPCs encountered by cache invalidator",
- },
- )
- rpcOpTypes = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "gitaly_cacheinvalidator_optype_total",
- Help: "Total number of operation types encountered by cache invalidator",
- },
- []string{"type"},
- )
- methodErrTotals = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "gitaly_cache_invalidator_error_total",
- Help: "Total number of cache invalidation errors by method",
- },
- []string{"method"},
- )
-)
-
-func init() {
- prometheus.MustRegister(rpcTotal)
- prometheus.MustRegister(rpcOpTypes)
- prometheus.MustRegister(methodErrTotals)
-}
-
-// countMethodErr is a package var to allow for overriding in tests
-var (
- countMethodErr = func(method string) { methodErrTotals.WithLabelValues(method).Add(1) }
- countRPCType = func(mInfo protoregistry.MethodInfo) {
- rpcTotal.Inc()
-
- switch mInfo.Operation {
- case protoregistry.OpAccessor:
- rpcOpTypes.WithLabelValues("accessor").Inc()
- case protoregistry.OpMutator:
- rpcOpTypes.WithLabelValues("mutator").Inc()
- default:
- rpcOpTypes.WithLabelValues("unknown").Inc()
- }
- }
-)