diff options
-rw-r--r-- | internal/gitaly/client/dial.go | 10 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware.go | 11 | ||||
-rw-r--r-- | internal/middleware/limithandler/pushback.go | 181 |
3 files changed, 201 insertions, 1 deletions
diff --git a/internal/gitaly/client/dial.go b/internal/gitaly/client/dial.go index def5b5fad..bc8cf0bca 100644 --- a/internal/gitaly/client/dial.go +++ b/internal/gitaly/client/dial.go @@ -205,6 +205,7 @@ func defaultServiceConfig() string { LoadBalancingConfig: []*gitalypb.LoadBalancingConfig{{ Policy: &gitalypb.LoadBalancingConfig_RoundRobin{}, }}, + MethodConfig: defaultMethodConfigs(), } configJSON, err := protojson.Marshal(serviceConfig) if err != nil { @@ -213,3 +214,12 @@ func defaultServiceConfig() string { return string(configJSON) } + +func defaultMethodConfigs() []*gitalypb.MethodConfig { + var configs []*gitalypb.MethodConfig + + // Add the list of method configs for RPCc pushback + configs = append(configs, limithandler.DefaultPushbackMethodConfigs()...) + + return configs +} diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go index c532943a4..632bfb1e9 100644 --- a/internal/middleware/limithandler/middleware.go +++ b/internal/middleware/limithandler/middleware.go @@ -42,6 +42,7 @@ type LimiterMiddleware struct { getLockKey GetLockKey requestsDroppedMetric *prometheus.CounterVec collect func(metrics chan<- prometheus.Metric) + pushback *pushback } // New creates a new middleware that limits requests. SetupFunc sets up the @@ -61,6 +62,7 @@ func New(cfg config.Cfg, getLockKey GetLockKey, setupMiddleware SetupFunc) *Limi "reason", }, ), + pushback: &pushback{policies: defaultPushbackPolicies()}, } setupMiddleware(cfg, middleware) @@ -95,9 +97,13 @@ func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { return handler(ctx, req) } - return limiter.Limit(ctx, lockKey, func() (interface{}, error) { + response, err := limiter.Limit(ctx, lockKey, func() (interface{}, error) { return handler(ctx, req) }) + if err != nil { + c.pushback.push(ctx, info.FullMethod, err) + } + return response, err } } @@ -163,6 +169,9 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { // It's our turn! return nil case err := <-errs: + if err != nil { + w.limiterMiddleware.pushback.push(ctx, w.info.FullMethod, err) + } return err } } diff --git a/internal/middleware/limithandler/pushback.go b/internal/middleware/limithandler/pushback.go new file mode 100644 index 000000000..d9726a178 --- /dev/null +++ b/internal/middleware/limithandler/pushback.go @@ -0,0 +1,181 @@ +package limithandler + +import ( + "context" + "errors" + "fmt" + "math/rand" + "strconv" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "gitlab.com/gitlab-org/gitaly/v15/internal/backoff" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v15/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/durationpb" +) + +const ( + // GrpcPushbackHeader is the key for gRPC response header that defines the duration in + // milliseconds a client should back-off before re-send the request again. This header is + // only effective if there is a retry policy configuration for the RPC. The retry policy + // must declare MaxAttempts and RetryableStatusCodes. This pushback duration has higher + // precedence than retry settings in the retry policy. + // + // When a client receives this header, it is forced to sleep. As this feature is supported + // in gRPC library implementations, clients can't refuse. It's recommended to add this + // header to protect most critical, resource-hungry RPCs, such as UploadPack, PackObject. + // Please be nice! + // + // For more information, visit https://github.com/grpc/proposal/blob/master/A6-client-retries.md#pushback + GrpcPushbackHeader = "grpc-retry-pushback-ms" + // GrpcPreviousAttempts defines the key of gRPC request header that stores the amount of + // previous attempts of the same RPC. This number counts "transparent" failures and failures + // matching RetryableStatusCodes retry config. This counter stays in the library layer. If + // the application layer performs the retry, this value is always set to 0. + GrpcPreviousAttempts = "grpc-previous-rpc-attempts" +) + +// grpcPushbackMaxAttempt defines the maximum attempt a client should retry. Other type of +// transient failures, such as network failures, are also taken into account. +var grpcPushbackMaxAttempt = 3 + +// grpcPushbackRetryableStatusCodes defines the list of gRPC codes a client should perform retry +// automatically. The status codes are capitalized SNAKE_CASE. The following link contains the list +// of all codes: https://grpc.github.io/grpc/core/md_doc_statuscodes.html +var grpcPushbackRetryableStatusCodes = []string{"RESOURCE_EXHAUSTED"} + +// Exponential backoff parameters +var ( + initialBackoff = 5 * time.Second + maxBackoff = 60 * time.Second + backoffMultiplier = 2.0 +) + +// RPCsWithPushbackHeaders defines the list of Gitaly RPCs to add pushback support. Please note +// that the list must only include external RPCs from clients. Adding an internal RPC, such as +// /gitaly.HookService/PackObjectsHookWithSidechannel, makes Gitaly pushes itself. +var RPCsWithPushbackHeaders = []string{ + "/gitaly.SSHService/SSHUploadPackWithSidechannel", + "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel", +} + +func init() { + for _, rpc := range RPCsWithPushbackHeaders { + if _, err := protoregistry.GitalyProtoPreregistered.LookupMethod(rpc); err != nil { + panic(fmt.Errorf("RPC not found: %s", rpc)) + } + } +} + +// DefaultPushbackMethodConfigs returns the list of gRPC method configs. Each method config includes +// name of RPC with pushback enabled, maximum attempts, and retryable status codes. This list should +// be added to client Service Config when it dials. +func DefaultPushbackMethodConfigs() []*gitalypb.MethodConfig { + var configs []*gitalypb.MethodConfig + + for _, rpc := range RPCsWithPushbackHeaders { + // Method is validated when this package is loaded + mth, _ := protoregistry.GitalyProtoPreregistered.LookupMethod(rpc) + + serviceName, methodName := mth.ServiceNameAndMethodName() + configs = append(configs, &gitalypb.MethodConfig{ + Name: []*gitalypb.MethodConfig_Name{{ + Service: serviceName, + Method: methodName, + }}, + // When specify pushback header grpc-retry-pushback-ms, client uses that + // value. Other exponential backoff parameters in RetryPolicy are ignored. + // We supply them here to make valid + RetryPolicy: &gitalypb.MethodConfig_RetryPolicy{ + MaxAttempts: uint32(grpcPushbackMaxAttempt), + RetryableStatusCodes: grpcPushbackRetryableStatusCodes, + InitialBackoff: durationpb.New(initialBackoff), + MaxBackoff: durationpb.New(maxBackoff), + BackoffMultiplier: float32(backoffMultiplier), + }, + }) + } + return configs +} + +// newLimitErrorBackoff returns an exponential backoff strategy when facing limit error. The backoff +// parameters are adjusted much longer and steeper than normal networking failure. When Gitaly node +// is saturated and starts to push-back traffic, it takes a lot of time for the situation to go +// away. The node can either wait for more room to breath or terminate in-flight requests. Either +// way, it does not make sense for clients to retry in short delays. +// +// | Retries | Delay before random jitter | +// | ------- | -------------------------- | +// | 0 | 5 second | +// | 1 | 10 seconds | +// | 2 | 20 seconds | +// | 3 | 40 seconds | +func newLimitErrorBackoff() backoff.Strategy { + exponential := backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))) + exponential.BaseDelay = initialBackoff + exponential.MaxDelay = maxBackoff + exponential.Multiplier = backoffMultiplier + return exponential +} + +func defaultPushbackPolicies() map[string]backoff.Strategy { + policies := map[string]backoff.Strategy{} + for _, rpc := range RPCsWithPushbackHeaders { + policies[rpc] = newLimitErrorBackoff() + } + return policies +} + +type pushback struct { + policies map[string]backoff.Strategy +} + +func (p *pushback) push(ctx context.Context, fullMethod string, err error) { + if !errors.Is(err, ErrMaxQueueSize) && !errors.Is(err, ErrMaxQueueTime) { + return + } + var strategy backoff.Strategy + strategy, exist := p.policies[fullMethod] + if !exist { + return + } + + var attempts uint + if strAttempts := metadata.ValueFromIncomingContext(ctx, GrpcPreviousAttempts); len(strAttempts) > 0 { + parsedAttempts, err := strconv.ParseInt(strAttempts[0], 10, 32) + if err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("fail to parse gRPC previous retry attempts") + return + } + attempts = uint(parsedAttempts) + } + + pushbackDuration := strategy.Backoff(attempts) + p.setResponseHeader(ctx, pushbackDuration) + p.setErrorDetail(err, pushbackDuration) +} + +func (p *pushback) setResponseHeader(ctx context.Context, pushbackDuration time.Duration) { + if err := grpc.SetTrailer(ctx, metadata.MD{GrpcPushbackHeader: []string{fmt.Sprintf("%d", pushbackDuration.Milliseconds())}}); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("fail to set gRPC push-back header") + } +} + +func (p *pushback) setErrorDetail(err error, pushbackDuration time.Duration) { + var structErr structerr.Error + if errors.As(err, &structErr) { + for _, detail := range structErr.Details() { + if limitError, ok := detail.(*gitalypb.LimitError); ok { + // The underlying layers can specify its own RetryAfter value. The + // middleware should honor that decision. + if limitError.RetryAfter.AsDuration() == time.Duration(0) { + limitError.RetryAfter = durationpb.New(pushbackDuration) + } + } + } + } +} |