diff options
author | Andrew Newdigate <andrew@gitlab.com> | 2017-09-30 00:19:50 +0300 |
---|---|---|
committer | Andrew Newdigate <andrew@gitlab.com> | 2017-09-30 00:19:50 +0300 |
commit | d1f47d9ff0824fe007d3525b3a87df311249dd51 (patch) | |
tree | db8ed9e33d03a2b4c649dc247ed003918e20414c | |
parent | 409fb4e5f38e135fb16bb2850d3513df832d6e0b (diff) | |
parent | bb849b4c136c7085e8b7b85126ec12f5caec0a21 (diff) |
Merge branch 'rate-limiter' into 'master'
Rate limiter
See merge request gitlab-org/gitaly!376
-rw-r--r-- | CHANGELOG.md | 2 | ||||
-rw-r--r-- | NOTICE | 53 | ||||
-rw-r--r-- | cmd/gitaly/main.go | 1 | ||||
-rw-r--r-- | config.toml.example | 5 | ||||
-rw-r--r-- | internal/config/concurrency.go | 16 | ||||
-rw-r--r-- | internal/config/config.go | 27 | ||||
-rw-r--r-- | internal/config/prometheus.go | 3 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 127 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter_test.go | 181 | ||||
-rw-r--r-- | internal/middleware/limithandler/limithandler.go | 95 | ||||
-rw-r--r-- | internal/middleware/limithandler/metrics.go | 113 | ||||
-rw-r--r-- | internal/server/server.go | 5 | ||||
-rw-r--r-- | vendor/golang.org/x/sync/LICENSE | 27 | ||||
-rw-r--r-- | vendor/golang.org/x/sync/PATENTS | 22 | ||||
-rw-r--r-- | vendor/golang.org/x/sync/semaphore/semaphore.go | 131 | ||||
-rw-r--r-- | vendor/vendor.json | 6 |
16 files changed, 804 insertions, 10 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index bcc5e33d8..7108ff2a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ UNRELEASED +- Allow individual endpoints to be rate-limited per-repository + https://gitlab.com/gitlab-org/gitaly/merge_requests/376 - Implement OperationService.UserDeleteBranch RPC https://gitlab.com/gitlab-org/gitaly/merge_requests/377 - Fix path bug in CommitService::FindCommits @@ -1927,6 +1927,59 @@ infringement, or inducement of patent infringement, then any patent rights granted to you under this License for this implementation of Go shall terminate as of the date such litigation is filed. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/sync +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +PATENTS - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/sync +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/sys Copyright (c) 2009 The Go Authors. All rights reserved. diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 348c627d7..1ba31c693 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -92,6 +92,7 @@ func main() { config.ConfigureLogging() config.ConfigureSentry(version.GetVersion()) config.ConfigurePrometheus() + config.ConfigureConcurrencyLimits() var listeners []net.Listener diff --git a/config.toml.example b/config.toml.example index 5fe394217..69c813c4f 100644 --- a/config.toml.example +++ b/config.toml.example @@ -42,3 +42,8 @@ dir = "/home/git/gitaly/ruby" [gitlab-shell] # The directory where gitlab-shell is installed dir = "/home/git/gitlab-shell" + +# # You can adjust the concurrency of each RPC endpoint +# [[concurrency]] +# rpc = "/gitaly.RepositoryService/GarbageCollect" +# max_per_repo = 1 diff --git a/internal/config/concurrency.go b/internal/config/concurrency.go new file mode 100644 index 000000000..7211bd3e1 --- /dev/null +++ b/internal/config/concurrency.go @@ -0,0 +1,16 @@ +package config + +import ( + "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" +) + +// ConfigureConcurrencyLimits configures the per-repo, per RPC rate limits +func ConfigureConcurrencyLimits() { + maxConcurrencyPerRepoPerRPC := make(map[string]int) + + for _, v := range Config.Concurrency { + maxConcurrencyPerRepoPerRPC[v.RPC] = v.MaxPerRepo + } + + limithandler.SetMaxRepoConcurrency(maxConcurrencyPerRepoPerRPC) +} diff --git a/internal/config/config.go b/internal/config/config.go index 61e0705fe..d1eac0ad5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,16 +19,17 @@ var ( ) type config struct { - SocketPath string `toml:"socket_path" split_words:"true"` - ListenAddr string `toml:"listen_addr" split_words:"true"` - PrometheusListenAddr string `toml:"prometheus_listen_addr" split_words:"true"` - Git Git `toml:"git" envconfig:"git"` - Storages []Storage `toml:"storage" envconfig:"storage"` - Logging Logging `toml:"logging" envconfig:"logging"` - Prometheus Prometheus `toml:"prometheus"` - Auth Auth `toml:"auth"` - Ruby Ruby `toml:"gitaly-ruby"` - GitlabShell GitlabShell `toml:"gitlab-shell"` + SocketPath string `toml:"socket_path" split_words:"true"` + ListenAddr string `toml:"listen_addr" split_words:"true"` + PrometheusListenAddr string `toml:"prometheus_listen_addr" split_words:"true"` + Git Git `toml:"git" envconfig:"git"` + Storages []Storage `toml:"storage" envconfig:"storage"` + Logging Logging `toml:"logging" envconfig:"logging"` + Prometheus Prometheus `toml:"prometheus"` + Auth Auth `toml:"auth"` + Ruby Ruby `toml:"gitaly-ruby"` + GitlabShell GitlabShell `toml:"gitlab-shell"` + Concurrency []Concurrency `toml:"concurrency"` } // GitlabShell contains the settings required for executing `gitlab-shell` @@ -58,6 +59,12 @@ type Prometheus struct { GRPCLatencyBuckets []float64 `toml:"grpc_latency_buckets"` } +// Concurrency allows endpoints to be limited to a maximum concurrency per repo +type Concurrency struct { + RPC string `toml:"rpc"` + MaxPerRepo int `toml:"max_per_repo"` +} + // Load initializes the Config variable from file and the environment. // Environment variables take precedence over the file. func Load(file io.Reader) error { diff --git a/internal/config/prometheus.go b/internal/config/prometheus.go index eb5dacc5b..2d8936f9a 100644 --- a/internal/config/prometheus.go +++ b/internal/config/prometheus.go @@ -4,6 +4,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" ) // ConfigurePrometheus uses the global configuration to configure prometheus @@ -17,4 +18,6 @@ func ConfigurePrometheus() { grpc_prometheus.EnableHandlingTimeHistogram(func(histogramOpts *prometheus.HistogramOpts) { histogramOpts.Buckets = Config.Prometheus.GRPCLatencyBuckets }) + + limithandler.EnableAcquireTimeHistogram(Config.Prometheus.GRPCLatencyBuckets) } diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go new file mode 100644 index 000000000..56c67ae89 --- /dev/null +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -0,0 +1,127 @@ +package limithandler + +import ( + "sync" + "time" + + "golang.org/x/net/context" + "golang.org/x/sync/semaphore" +) + +// LimitedFunc represents a function that will be limited +type LimitedFunc func() (resp interface{}, err error) + +// ConcurrencyMonitor allows the concurrency monitor to be observed +type ConcurrencyMonitor interface { + Queued(ctx context.Context) + Dequeued(ctx context.Context) + Enter(ctx context.Context, acquireTime time.Duration) + Exit(ctx context.Context) +} + +// ConcurrencyLimiter contains rate limiter state +type ConcurrencyLimiter struct { + // A weighted semaphore is like a mutex, but with a number of 'slots'. + // When locking the locker requests 1 or more slots to be locked. + // In this package, the number of slots is the number of concurrent requests the rate limiter lets through. + // https://godoc.org/golang.org/x/sync/semaphore + semaphores map[string]*semaphore.Weighted + max int64 + mux *sync.Mutex + monitor ConcurrencyMonitor +} + +// Lazy create a semaphore for the given key +func (c *ConcurrencyLimiter) getSemaphore(lockKey string) *semaphore.Weighted { + c.mux.Lock() + defer c.mux.Unlock() + + ws := c.semaphores[lockKey] + if ws != nil { + return ws + } + + w := semaphore.NewWeighted(c.max) + c.semaphores[lockKey] = w + return w +} + +func (c *ConcurrencyLimiter) attemptCollection(lockKey string) { + c.mux.Lock() + defer c.mux.Unlock() + + ws := c.semaphores[lockKey] + if ws == nil { + return + } + + if !ws.TryAcquire(c.max) { + return + } + + // By releasing, we prevent a lockup of goroutines that have already + // acquired the semaphore, but have yet to acquire on it + ws.Release(c.max) + + // If we managed to acquire all the locks, we can remove the semaphore for this key + delete(c.semaphores, lockKey) +} + +func (c *ConcurrencyLimiter) countSemaphores() int { + c.mux.Lock() + defer c.mux.Unlock() + + return len(c.semaphores) +} + +// Limit will limit the concurrency of f +func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) { + if c.max <= 0 { + return f() + } + + start := time.Now() + c.monitor.Queued(ctx) + + w := c.getSemaphore(lockKey) + + // Attempt to cleanup the semaphore it's no longer being used + defer c.attemptCollection(lockKey) + + err := w.Acquire(ctx, 1) + c.monitor.Dequeued(ctx) + + if err != nil { + return nil, err + } + + c.monitor.Enter(ctx, time.Since(start)) + defer c.monitor.Exit(ctx) + + defer w.Release(1) + + resp, err := f() + + return resp, err +} + +// NewLimiter creates a new rate limiter +func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter { + if monitor == nil { + monitor = &nullConcurrencyMonitor{} + } + + return &ConcurrencyLimiter{ + semaphores: make(map[string]*semaphore.Weighted), + max: int64(max), + mux: &sync.Mutex{}, + monitor: monitor, + } +} + +type nullConcurrencyMonitor struct{} + +func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {} +func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {} +func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {} +func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {} diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go new file mode 100644 index 000000000..f045c2c31 --- /dev/null +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -0,0 +1,181 @@ +package limithandler + +import ( + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "golang.org/x/net/context" +) + +type counter struct { + sync.Mutex + max int + current int + queued int + dequeued int + enter int + exit int +} + +func (c *counter) up() { + c.Lock() + defer c.Unlock() + + c.current = c.current + 1 + if c.current > c.max { + c.max = c.current + } +} + +func (c *counter) down() { + c.Lock() + defer c.Unlock() + + c.current = c.current - 1 +} + +func (c *counter) Queued(ctx context.Context) { + c.Lock() + defer c.Unlock() + c.queued++ +} + +func (c *counter) Dequeued(ctx context.Context) { + c.Lock() + defer c.Unlock() + c.dequeued++ +} + +func (c *counter) Enter(ctx context.Context, acquireTime time.Duration) { + c.Lock() + defer c.Unlock() + c.enter++ +} + +func (c *counter) Exit(ctx context.Context) { + c.Lock() + defer c.Unlock() + c.exit++ +} + +func TestLimiter(t *testing.T) { + tests := []struct { + name string + concurrency int + maxConcurrency int + iterations int + delay time.Duration + buckets int + wantMaxRange []int + wantMonitorCalls bool + }{ + { + name: "single", + concurrency: 1, + maxConcurrency: 1, + iterations: 1, + delay: 1 * time.Millisecond, + buckets: 1, + wantMaxRange: []int{1, 1}, + wantMonitorCalls: true, + }, + { + name: "two-at-a-time", + concurrency: 100, + maxConcurrency: 2, + iterations: 10, + delay: 1 * time.Millisecond, + buckets: 1, + wantMaxRange: []int{2, 3}, + wantMonitorCalls: true, + }, + { + name: "two-by-two", + concurrency: 100, + maxConcurrency: 2, + delay: 1000 * time.Nanosecond, + iterations: 4, + buckets: 2, + wantMaxRange: []int{4, 5}, + wantMonitorCalls: true, + }, + { + name: "no-limit", + concurrency: 10, + maxConcurrency: 0, + iterations: 200, + delay: 1000 * time.Nanosecond, + buckets: 1, + wantMaxRange: []int{10, 10}, + wantMonitorCalls: false, + }, + { + name: "wide-spread", + concurrency: 1000, + maxConcurrency: 2, + delay: 100 * time.Nanosecond, + iterations: 40, + buckets: 50, + // Intentionally leaving the max low because CI runners + // may struggle to do 80 things in parallel + wantMaxRange: []int{80, 102}, + wantMonitorCalls: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gauge := &counter{} + + limiter := NewLimiter(tt.maxConcurrency, gauge) + wg := sync.WaitGroup{} + wg.Add(tt.concurrency) + + // We know of an edge case that can lead to the rate limiter + // occassionally letting one or two extra goroutines run + // concurrently. + for c := 0; c < tt.concurrency; c++ { + go func(counter int) { + for i := 0; i < tt.iterations; i++ { + lockKey := strconv.Itoa((i ^ counter) % tt.buckets) + + limiter.Limit(context.Background(), lockKey, func() (interface{}, error) { + gauge.up() + + assert.True(t, gauge.current <= tt.wantMaxRange[1], "Expected the number of concurrent operations (%v) to not exceed the maximum concurrency (%v)", gauge.current, tt.wantMaxRange[1]) + assert.True(t, limiter.countSemaphores() <= tt.buckets, "Expected the number of semaphores (%v) to be lte number of buckets (%v)", limiter.countSemaphores(), tt.buckets) + time.Sleep(tt.delay) + + gauge.down() + return nil, nil + }) + + time.Sleep(tt.delay) + } + + wg.Done() + }(c) + } + + wg.Wait() + assert.True(t, tt.wantMaxRange[0] <= gauge.max && gauge.max <= tt.wantMaxRange[1], "Expected maximum concurrency to be in the range [%v,%v] but got %v", tt.wantMaxRange[0], tt.wantMaxRange[1], gauge.max) + assert.Equal(t, 0, gauge.current) + assert.Equal(t, 0, limiter.countSemaphores()) + + var wantMonitorCallCount int + if tt.wantMonitorCalls { + wantMonitorCallCount = tt.concurrency * tt.iterations + } else { + wantMonitorCallCount = 0 + } + + assert.Equal(t, wantMonitorCallCount, gauge.enter) + assert.Equal(t, wantMonitorCallCount, gauge.exit) + assert.Equal(t, wantMonitorCallCount, gauge.queued) + assert.Equal(t, wantMonitorCallCount, gauge.dequeued) + }) + } +} diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go new file mode 100644 index 000000000..bd924659b --- /dev/null +++ b/internal/middleware/limithandler/limithandler.go @@ -0,0 +1,95 @@ +package limithandler + +import ( + "github.com/grpc-ecosystem/go-grpc-middleware/tags" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +// LimiterMiddleware contains rate limiter state +type LimiterMiddleware struct { + methodLimiters map[string]*ConcurrencyLimiter +} + +var maxConcurrencyPerRepoPerRPC map[string]int + +func getRepoPath(ctx context.Context) string { + tags := grpc_ctxtags.Extract(ctx) + ctxValue := tags.Values()["grpc.request.repoPath"] + if ctxValue == nil { + return "" + } + + s, ok := ctxValue.(string) + if ok { + return s + } + + return "" +} + +// UnaryInterceptor returns a Unary Interceptor +func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + repoPath := getRepoPath(ctx) + if repoPath == "" { + return handler(ctx, req) + } + + limiter := c.methodLimiters[info.FullMethod] + if limiter == nil { + // No concurrency limiting + return handler(ctx, req) + } + + return limiter.Limit(ctx, repoPath, func() (interface{}, error) { + return handler(ctx, req) + }) + } +} + +// StreamInterceptor returns a Stream Interceptor +func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := stream.Context() + + repoPath := getRepoPath(ctx) + if repoPath == "" { + return handler(srv, stream) + } + + limiter := c.methodLimiters[info.FullMethod] + if limiter == nil { + // No concurrency limiting + return handler(srv, stream) + } + + _, err := limiter.Limit(ctx, repoPath, func() (interface{}, error) { + return nil, handler(srv, stream) + }) + + return err + } +} + +// New creates a new rate limiter +func New() LimiterMiddleware { + return LimiterMiddleware{ + methodLimiters: createLimiterConfig(), + } +} + +func createLimiterConfig() map[string]*ConcurrencyLimiter { + result := make(map[string]*ConcurrencyLimiter) + + for fullMethodName, max := range maxConcurrencyPerRepoPerRPC { + result[fullMethodName] = NewLimiter(max, newPromMonitor(fullMethodName)) + } + + return result +} + +// SetMaxRepoConcurrency Configures the max concurrency per repo per RPC +func SetMaxRepoConcurrency(config map[string]int) { + maxConcurrencyPerRepoPerRPC = config +} diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go new file mode 100644 index 000000000..bbb9ad533 --- /dev/null +++ b/internal/middleware/limithandler/metrics.go @@ -0,0 +1,113 @@ +package limithandler + +import ( + "strings" + "time" + + prom "github.com/prometheus/client_golang/prometheus" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + "golang.org/x/net/context" +) + +const acquireDurationLogThreshold = 10 * time.Millisecond + +var ( + histogramEnabled = false + histogramVec *prom.HistogramVec + inprogressGaugeVec = prom.NewGaugeVec( + prom.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "in_progress", + Help: "Gauge of number of number of concurrent invocations currently in progress for this endpoint", + }, + []string{"grpc_service", "grpc_method"}, + ) + + queuedGaugeVec = prom.NewGaugeVec( + prom.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "queued", + Help: "Gauge of number of number of invocations currently queued for this endpoint", + }, + []string{"grpc_service", "grpc_method"}, + ) +) + +type promMonitor struct { + queuedGauge prom.Gauge + inprogressGauge prom.Gauge + histogram prom.Histogram +} + +func init() { + prom.MustRegister(inprogressGaugeVec, queuedGaugeVec) +} + +func splitMethodName(fullMethodName string) (string, string) { + fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash + if i := strings.Index(fullMethodName, "/"); i >= 0 { + return fullMethodName[:i], fullMethodName[i+1:] + } + return "unknown", "unknown" +} + +// EnableAcquireTimeHistogram enables histograms for acquisition times +func EnableAcquireTimeHistogram(buckets []float64) { + histogramEnabled = true + histogramOpts := prom.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "acquiring_seconds", + Help: "Histogram of lock acquisition latency (seconds) for endpoint rate limiting", + Buckets: buckets, + } + + histogramVec = prom.NewHistogramVec( + histogramOpts, + []string{"grpc_service", "grpc_method"}, + ) + + prom.Register(histogramVec) +} + +func (c *promMonitor) Queued(ctx context.Context) { + c.queuedGauge.Inc() +} + +func (c *promMonitor) Dequeued(ctx context.Context) { + c.queuedGauge.Dec() +} + +func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) { + c.inprogressGauge.Inc() + + if acquireTime > acquireDurationLogThreshold { + logger := grpc_logrus.Extract(ctx) + logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait") + } + + if c.histogram != nil { + c.histogram.Observe(acquireTime.Seconds()) + } +} + +func (c *promMonitor) Exit(ctx context.Context) { + c.inprogressGauge.Dec() +} + +func newPromMonitor(fullMethod string) ConcurrencyMonitor { + serviceName, methodName := splitMethodName(fullMethod) + + queuedGauge := queuedGaugeVec.WithLabelValues(serviceName, methodName) + inprogressGauge := inprogressGaugeVec.WithLabelValues(serviceName, methodName) + + var histogram prom.Histogram + if histogramVec != nil { + histogram = histogramVec.WithLabelValues(serviceName, methodName) + } + + return &promMonitor{queuedGauge, inprogressGauge, histogram} +} diff --git a/internal/server/server.go b/internal/server/server.go index 4690bb5c3..33d8da3fd 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -5,6 +5,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler" + "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" @@ -28,6 +29,8 @@ func New(rubyServer *rubyserver.Server) *grpc.Server { grpc_ctxtags.WithFieldExtractor(fieldextractors.RepositoryFieldExtractor), } + lh := limithandler.New() + server := grpc.NewServer( grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), @@ -35,6 +38,7 @@ func New(rubyServer *rubyserver.Server) *grpc.Server { grpc_logrus.StreamServerInterceptor(logrusEntry), sentryhandler.StreamLogHandler, cancelhandler.Stream, // Should be below LogHandler + lh.StreamInterceptor(), authStreamServerInterceptor(), // Panic handler should remain last so that application panics will be // converted to errors and logged @@ -46,6 +50,7 @@ func New(rubyServer *rubyserver.Server) *grpc.Server { grpc_logrus.UnaryServerInterceptor(logrusEntry), sentryhandler.UnaryLogHandler, cancelhandler.Unary, // Should be below LogHandler + lh.UnaryInterceptor(), authUnaryServerInterceptor(), // Panic handler should remain last so that application panics will be // converted to errors and logged diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 000000000..6a66aea5e --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 000000000..e9d2d79a9 --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,131 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "sync" + + // Use the old context because packages that depend on this one + // (e.g. cloud.google.com/go/...) must run on Go 1.6. + // TODO(jba): update to "context" when possible. + "golang.org/x/net/context" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking only until ctx +// is done. On success, returns nil. On failure, returns ctx.Err() and leaves +// the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + s.waiters.Remove(elem) + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: bad release") + } + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } + s.mu.Unlock() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 7520e4962..9e4f072b7 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -257,6 +257,12 @@ "revisionTime": "2017-06-03T08:13:02Z" }, { + "checksumSHA1": "vcN67ZjTbGpLLwSghHCbAEvmzMo=", + "path": "golang.org/x/sync/semaphore", + "revision": "f52d1811a62927559de87708c8913c1650ce4f26", + "revisionTime": "2017-05-17T20:25:26Z" + }, + { "checksumSHA1": "wxrHmKhFznZZAjrYK5/nWn+fZGc=", "path": "golang.org/x/sys/unix", "revision": "dbc2be9168a660ef302e04b6ff6406de6f967473", |