Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-04-21 13:15:03 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-05-11 17:59:51 +0300
commit4db4efafc7d769707e1732bd372dd2c859875856 (patch)
tree6a2e4308cef09449807a6091e9899959b18d6b6f
parente65d5c4597be6d0faf5353a72d0951910ddd97e6 (diff)
Add `grpc-retry-pushback-ms` header when exceeding concurrency limitqmnguyen0711/add-grpc-retry-pushback-header
We have a concurrency limit for critical resource-hungry RPCs. When a request exceeds the limit, it is rejected with ResourceExhausted status code. This commit adds `grpc-retry-pushback-ms` header to rejection responses. This header is only effective if there is a retry policy configuration for the RPC. The retry policy must declare MaxAttempts and RetryableStatusCodes. This pushback duration has higher precedence than retry settings in the retry policy. When a client receives this header, it is forced to sleep. As this feature is supported in gRPC library implementations, clients can't refuse. It's recommended to add this header to protect most critical, resource-hungry RPCs, such as UploadPack, PackObject. Pushback duration is exponential. It is calculated from the previous retry attempts fo the same request. The retry attempt is extrated from `grpc-previous-rpc-attempts` request header.
-rw-r--r--internal/gitaly/client/dial.go10
-rw-r--r--internal/middleware/limithandler/middleware.go11
-rw-r--r--internal/middleware/limithandler/pushback.go181
3 files changed, 201 insertions, 1 deletions
diff --git a/internal/gitaly/client/dial.go b/internal/gitaly/client/dial.go
index def5b5fad..bc8cf0bca 100644
--- a/internal/gitaly/client/dial.go
+++ b/internal/gitaly/client/dial.go
@@ -205,6 +205,7 @@ func defaultServiceConfig() string {
LoadBalancingConfig: []*gitalypb.LoadBalancingConfig{{
Policy: &gitalypb.LoadBalancingConfig_RoundRobin{},
}},
+ MethodConfig: defaultMethodConfigs(),
}
configJSON, err := protojson.Marshal(serviceConfig)
if err != nil {
@@ -213,3 +214,12 @@ func defaultServiceConfig() string {
return string(configJSON)
}
+
+func defaultMethodConfigs() []*gitalypb.MethodConfig {
+ var configs []*gitalypb.MethodConfig
+
+ // Add the list of method configs for RPCc pushback
+ configs = append(configs, limithandler.DefaultPushbackMethodConfigs()...)
+
+ return configs
+}
diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go
index c532943a4..632bfb1e9 100644
--- a/internal/middleware/limithandler/middleware.go
+++ b/internal/middleware/limithandler/middleware.go
@@ -42,6 +42,7 @@ type LimiterMiddleware struct {
getLockKey GetLockKey
requestsDroppedMetric *prometheus.CounterVec
collect func(metrics chan<- prometheus.Metric)
+ pushback *pushback
}
// New creates a new middleware that limits requests. SetupFunc sets up the
@@ -61,6 +62,7 @@ func New(cfg config.Cfg, getLockKey GetLockKey, setupMiddleware SetupFunc) *Limi
"reason",
},
),
+ pushback: &pushback{policies: defaultPushbackPolicies()},
}
setupMiddleware(cfg, middleware)
@@ -95,9 +97,13 @@ func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor {
return handler(ctx, req)
}
- return limiter.Limit(ctx, lockKey, func() (interface{}, error) {
+ response, err := limiter.Limit(ctx, lockKey, func() (interface{}, error) {
return handler(ctx, req)
})
+ if err != nil {
+ c.pushback.push(ctx, info.FullMethod, err)
+ }
+ return response, err
}
}
@@ -163,6 +169,9 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
// It's our turn!
return nil
case err := <-errs:
+ if err != nil {
+ w.limiterMiddleware.pushback.push(ctx, w.info.FullMethod, err)
+ }
return err
}
}
diff --git a/internal/middleware/limithandler/pushback.go b/internal/middleware/limithandler/pushback.go
new file mode 100644
index 000000000..d9726a178
--- /dev/null
+++ b/internal/middleware/limithandler/pushback.go
@@ -0,0 +1,181 @@
+package limithandler
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math/rand"
+ "strconv"
+ "time"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/backoff"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/protobuf/types/known/durationpb"
+)
+
+const (
+ // GrpcPushbackHeader is the key for gRPC response header that defines the duration in
+ // milliseconds a client should back-off before re-send the request again. This header is
+ // only effective if there is a retry policy configuration for the RPC. The retry policy
+ // must declare MaxAttempts and RetryableStatusCodes. This pushback duration has higher
+ // precedence than retry settings in the retry policy.
+ //
+ // When a client receives this header, it is forced to sleep. As this feature is supported
+ // in gRPC library implementations, clients can't refuse. It's recommended to add this
+ // header to protect most critical, resource-hungry RPCs, such as UploadPack, PackObject.
+ // Please be nice!
+ //
+ // For more information, visit https://github.com/grpc/proposal/blob/master/A6-client-retries.md#pushback
+ GrpcPushbackHeader = "grpc-retry-pushback-ms"
+ // GrpcPreviousAttempts defines the key of gRPC request header that stores the amount of
+ // previous attempts of the same RPC. This number counts "transparent" failures and failures
+ // matching RetryableStatusCodes retry config. This counter stays in the library layer. If
+ // the application layer performs the retry, this value is always set to 0.
+ GrpcPreviousAttempts = "grpc-previous-rpc-attempts"
+)
+
+// grpcPushbackMaxAttempt defines the maximum attempt a client should retry. Other type of
+// transient failures, such as network failures, are also taken into account.
+var grpcPushbackMaxAttempt = 3
+
+// grpcPushbackRetryableStatusCodes defines the list of gRPC codes a client should perform retry
+// automatically. The status codes are capitalized SNAKE_CASE. The following link contains the list
+// of all codes: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
+var grpcPushbackRetryableStatusCodes = []string{"RESOURCE_EXHAUSTED"}
+
+// Exponential backoff parameters
+var (
+ initialBackoff = 5 * time.Second
+ maxBackoff = 60 * time.Second
+ backoffMultiplier = 2.0
+)
+
+// RPCsWithPushbackHeaders defines the list of Gitaly RPCs to add pushback support. Please note
+// that the list must only include external RPCs from clients. Adding an internal RPC, such as
+// /gitaly.HookService/PackObjectsHookWithSidechannel, makes Gitaly pushes itself.
+var RPCsWithPushbackHeaders = []string{
+ "/gitaly.SSHService/SSHUploadPackWithSidechannel",
+ "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel",
+}
+
+func init() {
+ for _, rpc := range RPCsWithPushbackHeaders {
+ if _, err := protoregistry.GitalyProtoPreregistered.LookupMethod(rpc); err != nil {
+ panic(fmt.Errorf("RPC not found: %s", rpc))
+ }
+ }
+}
+
+// DefaultPushbackMethodConfigs returns the list of gRPC method configs. Each method config includes
+// name of RPC with pushback enabled, maximum attempts, and retryable status codes. This list should
+// be added to client Service Config when it dials.
+func DefaultPushbackMethodConfigs() []*gitalypb.MethodConfig {
+ var configs []*gitalypb.MethodConfig
+
+ for _, rpc := range RPCsWithPushbackHeaders {
+ // Method is validated when this package is loaded
+ mth, _ := protoregistry.GitalyProtoPreregistered.LookupMethod(rpc)
+
+ serviceName, methodName := mth.ServiceNameAndMethodName()
+ configs = append(configs, &gitalypb.MethodConfig{
+ Name: []*gitalypb.MethodConfig_Name{{
+ Service: serviceName,
+ Method: methodName,
+ }},
+ // When specify pushback header grpc-retry-pushback-ms, client uses that
+ // value. Other exponential backoff parameters in RetryPolicy are ignored.
+ // We supply them here to make valid
+ RetryPolicy: &gitalypb.MethodConfig_RetryPolicy{
+ MaxAttempts: uint32(grpcPushbackMaxAttempt),
+ RetryableStatusCodes: grpcPushbackRetryableStatusCodes,
+ InitialBackoff: durationpb.New(initialBackoff),
+ MaxBackoff: durationpb.New(maxBackoff),
+ BackoffMultiplier: float32(backoffMultiplier),
+ },
+ })
+ }
+ return configs
+}
+
+// newLimitErrorBackoff returns an exponential backoff strategy when facing limit error. The backoff
+// parameters are adjusted much longer and steeper than normal networking failure. When Gitaly node
+// is saturated and starts to push-back traffic, it takes a lot of time for the situation to go
+// away. The node can either wait for more room to breath or terminate in-flight requests. Either
+// way, it does not make sense for clients to retry in short delays.
+//
+// | Retries | Delay before random jitter |
+// | ------- | -------------------------- |
+// | 0 | 5 second |
+// | 1 | 10 seconds |
+// | 2 | 20 seconds |
+// | 3 | 40 seconds |
+func newLimitErrorBackoff() backoff.Strategy {
+ exponential := backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano())))
+ exponential.BaseDelay = initialBackoff
+ exponential.MaxDelay = maxBackoff
+ exponential.Multiplier = backoffMultiplier
+ return exponential
+}
+
+func defaultPushbackPolicies() map[string]backoff.Strategy {
+ policies := map[string]backoff.Strategy{}
+ for _, rpc := range RPCsWithPushbackHeaders {
+ policies[rpc] = newLimitErrorBackoff()
+ }
+ return policies
+}
+
+type pushback struct {
+ policies map[string]backoff.Strategy
+}
+
+func (p *pushback) push(ctx context.Context, fullMethod string, err error) {
+ if !errors.Is(err, ErrMaxQueueSize) && !errors.Is(err, ErrMaxQueueTime) {
+ return
+ }
+ var strategy backoff.Strategy
+ strategy, exist := p.policies[fullMethod]
+ if !exist {
+ return
+ }
+
+ var attempts uint
+ if strAttempts := metadata.ValueFromIncomingContext(ctx, GrpcPreviousAttempts); len(strAttempts) > 0 {
+ parsedAttempts, err := strconv.ParseInt(strAttempts[0], 10, 32)
+ if err != nil {
+ ctxlogrus.Extract(ctx).WithError(err).Error("fail to parse gRPC previous retry attempts")
+ return
+ }
+ attempts = uint(parsedAttempts)
+ }
+
+ pushbackDuration := strategy.Backoff(attempts)
+ p.setResponseHeader(ctx, pushbackDuration)
+ p.setErrorDetail(err, pushbackDuration)
+}
+
+func (p *pushback) setResponseHeader(ctx context.Context, pushbackDuration time.Duration) {
+ if err := grpc.SetTrailer(ctx, metadata.MD{GrpcPushbackHeader: []string{fmt.Sprintf("%d", pushbackDuration.Milliseconds())}}); err != nil {
+ ctxlogrus.Extract(ctx).WithError(err).Error("fail to set gRPC push-back header")
+ }
+}
+
+func (p *pushback) setErrorDetail(err error, pushbackDuration time.Duration) {
+ var structErr structerr.Error
+ if errors.As(err, &structErr) {
+ for _, detail := range structErr.Details() {
+ if limitError, ok := detail.(*gitalypb.LimitError); ok {
+ // The underlying layers can specify its own RetryAfter value. The
+ // middleware should honor that decision.
+ if limitError.RetryAfter.AsDuration() == time.Duration(0) {
+ limitError.RetryAfter = durationpb.New(pushbackDuration)
+ }
+ }
+ }
+ }
+}