Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Newdigate <andrew@gitlab.com>2017-09-30 00:19:50 +0300
committerAndrew Newdigate <andrew@gitlab.com>2017-09-30 00:19:50 +0300
commitd1f47d9ff0824fe007d3525b3a87df311249dd51 (patch)
treedb8ed9e33d03a2b4c649dc247ed003918e20414c
parent409fb4e5f38e135fb16bb2850d3513df832d6e0b (diff)
parentbb849b4c136c7085e8b7b85126ec12f5caec0a21 (diff)
Merge branch 'rate-limiter' into 'master'
Rate limiter See merge request gitlab-org/gitaly!376
-rw-r--r--CHANGELOG.md2
-rw-r--r--NOTICE53
-rw-r--r--cmd/gitaly/main.go1
-rw-r--r--config.toml.example5
-rw-r--r--internal/config/concurrency.go16
-rw-r--r--internal/config/config.go27
-rw-r--r--internal/config/prometheus.go3
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go127
-rw-r--r--internal/middleware/limithandler/concurrency_limiter_test.go181
-rw-r--r--internal/middleware/limithandler/limithandler.go95
-rw-r--r--internal/middleware/limithandler/metrics.go113
-rw-r--r--internal/server/server.go5
-rw-r--r--vendor/golang.org/x/sync/LICENSE27
-rw-r--r--vendor/golang.org/x/sync/PATENTS22
-rw-r--r--vendor/golang.org/x/sync/semaphore/semaphore.go131
-rw-r--r--vendor/vendor.json6
16 files changed, 804 insertions, 10 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bcc5e33d8..7108ff2a8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,8 @@
UNRELEASED
+- Allow individual endpoints to be rate-limited per-repository
+ https://gitlab.com/gitlab-org/gitaly/merge_requests/376
- Implement OperationService.UserDeleteBranch RPC
https://gitlab.com/gitlab-org/gitaly/merge_requests/377
- Fix path bug in CommitService::FindCommits
diff --git a/NOTICE b/NOTICE
index 02b6750e4..6441e74d7 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1927,6 +1927,59 @@ infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/sync
+Copyright (c) 2009 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+PATENTS - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/sync
+Additional IP Rights Grant (Patents)
+
+"This implementation" means the copyrightable works distributed by
+Google as part of the Go project.
+
+Google hereby grants to You a perpetual, worldwide, non-exclusive,
+no-charge, royalty-free, irrevocable (except as stated in this section)
+patent license to make, have made, use, offer to sell, sell, import,
+transfer and otherwise run, modify and propagate the contents of this
+implementation of Go, where such license applies only to those patent
+claims, both currently owned or controlled by Google and acquired in
+the future, licensable by Google that are necessarily infringed by this
+implementation of Go. This grant does not include claims that would be
+infringed only as a consequence of further modification of this
+implementation. If you or your agent or exclusive licensee institute or
+order or agree to the institution of patent litigation against any
+entity (including a cross-claim or counterclaim in a lawsuit) alleging
+that this implementation of Go or any code incorporated within this
+implementation of Go constitutes direct or contributory patent
+infringement, or inducement of patent infringement, then any patent
+rights granted to you under this License for this implementation of Go
+shall terminate as of the date such litigation is filed.
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/sys
Copyright (c) 2009 The Go Authors. All rights reserved.
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index 348c627d7..1ba31c693 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -92,6 +92,7 @@ func main() {
config.ConfigureLogging()
config.ConfigureSentry(version.GetVersion())
config.ConfigurePrometheus()
+ config.ConfigureConcurrencyLimits()
var listeners []net.Listener
diff --git a/config.toml.example b/config.toml.example
index 5fe394217..69c813c4f 100644
--- a/config.toml.example
+++ b/config.toml.example
@@ -42,3 +42,8 @@ dir = "/home/git/gitaly/ruby"
[gitlab-shell]
# The directory where gitlab-shell is installed
dir = "/home/git/gitlab-shell"
+
+# # You can adjust the concurrency of each RPC endpoint
+# [[concurrency]]
+# rpc = "/gitaly.RepositoryService/GarbageCollect"
+# max_per_repo = 1
diff --git a/internal/config/concurrency.go b/internal/config/concurrency.go
new file mode 100644
index 000000000..7211bd3e1
--- /dev/null
+++ b/internal/config/concurrency.go
@@ -0,0 +1,16 @@
+package config
+
+import (
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler"
+)
+
+// ConfigureConcurrencyLimits configures the per-repo, per RPC rate limits
+func ConfigureConcurrencyLimits() {
+ maxConcurrencyPerRepoPerRPC := make(map[string]int)
+
+ for _, v := range Config.Concurrency {
+ maxConcurrencyPerRepoPerRPC[v.RPC] = v.MaxPerRepo
+ }
+
+ limithandler.SetMaxRepoConcurrency(maxConcurrencyPerRepoPerRPC)
+}
diff --git a/internal/config/config.go b/internal/config/config.go
index 61e0705fe..d1eac0ad5 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -19,16 +19,17 @@ var (
)
type config struct {
- SocketPath string `toml:"socket_path" split_words:"true"`
- ListenAddr string `toml:"listen_addr" split_words:"true"`
- PrometheusListenAddr string `toml:"prometheus_listen_addr" split_words:"true"`
- Git Git `toml:"git" envconfig:"git"`
- Storages []Storage `toml:"storage" envconfig:"storage"`
- Logging Logging `toml:"logging" envconfig:"logging"`
- Prometheus Prometheus `toml:"prometheus"`
- Auth Auth `toml:"auth"`
- Ruby Ruby `toml:"gitaly-ruby"`
- GitlabShell GitlabShell `toml:"gitlab-shell"`
+ SocketPath string `toml:"socket_path" split_words:"true"`
+ ListenAddr string `toml:"listen_addr" split_words:"true"`
+ PrometheusListenAddr string `toml:"prometheus_listen_addr" split_words:"true"`
+ Git Git `toml:"git" envconfig:"git"`
+ Storages []Storage `toml:"storage" envconfig:"storage"`
+ Logging Logging `toml:"logging" envconfig:"logging"`
+ Prometheus Prometheus `toml:"prometheus"`
+ Auth Auth `toml:"auth"`
+ Ruby Ruby `toml:"gitaly-ruby"`
+ GitlabShell GitlabShell `toml:"gitlab-shell"`
+ Concurrency []Concurrency `toml:"concurrency"`
}
// GitlabShell contains the settings required for executing `gitlab-shell`
@@ -58,6 +59,12 @@ type Prometheus struct {
GRPCLatencyBuckets []float64 `toml:"grpc_latency_buckets"`
}
+// Concurrency allows endpoints to be limited to a maximum concurrency per repo
+type Concurrency struct {
+ RPC string `toml:"rpc"`
+ MaxPerRepo int `toml:"max_per_repo"`
+}
+
// Load initializes the Config variable from file and the environment.
// Environment variables take precedence over the file.
func Load(file io.Reader) error {
diff --git a/internal/config/prometheus.go b/internal/config/prometheus.go
index eb5dacc5b..2d8936f9a 100644
--- a/internal/config/prometheus.go
+++ b/internal/config/prometheus.go
@@ -4,6 +4,7 @@ import (
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler"
)
// ConfigurePrometheus uses the global configuration to configure prometheus
@@ -17,4 +18,6 @@ func ConfigurePrometheus() {
grpc_prometheus.EnableHandlingTimeHistogram(func(histogramOpts *prometheus.HistogramOpts) {
histogramOpts.Buckets = Config.Prometheus.GRPCLatencyBuckets
})
+
+ limithandler.EnableAcquireTimeHistogram(Config.Prometheus.GRPCLatencyBuckets)
}
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
new file mode 100644
index 000000000..56c67ae89
--- /dev/null
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -0,0 +1,127 @@
+package limithandler
+
+import (
+ "sync"
+ "time"
+
+ "golang.org/x/net/context"
+ "golang.org/x/sync/semaphore"
+)
+
+// LimitedFunc represents a function that will be limited
+type LimitedFunc func() (resp interface{}, err error)
+
+// ConcurrencyMonitor allows the concurrency monitor to be observed
+type ConcurrencyMonitor interface {
+ Queued(ctx context.Context)
+ Dequeued(ctx context.Context)
+ Enter(ctx context.Context, acquireTime time.Duration)
+ Exit(ctx context.Context)
+}
+
+// ConcurrencyLimiter contains rate limiter state
+type ConcurrencyLimiter struct {
+ // A weighted semaphore is like a mutex, but with a number of 'slots'.
+ // When locking the locker requests 1 or more slots to be locked.
+ // In this package, the number of slots is the number of concurrent requests the rate limiter lets through.
+ // https://godoc.org/golang.org/x/sync/semaphore
+ semaphores map[string]*semaphore.Weighted
+ max int64
+ mux *sync.Mutex
+ monitor ConcurrencyMonitor
+}
+
+// Lazy create a semaphore for the given key
+func (c *ConcurrencyLimiter) getSemaphore(lockKey string) *semaphore.Weighted {
+ c.mux.Lock()
+ defer c.mux.Unlock()
+
+ ws := c.semaphores[lockKey]
+ if ws != nil {
+ return ws
+ }
+
+ w := semaphore.NewWeighted(c.max)
+ c.semaphores[lockKey] = w
+ return w
+}
+
+func (c *ConcurrencyLimiter) attemptCollection(lockKey string) {
+ c.mux.Lock()
+ defer c.mux.Unlock()
+
+ ws := c.semaphores[lockKey]
+ if ws == nil {
+ return
+ }
+
+ if !ws.TryAcquire(c.max) {
+ return
+ }
+
+ // By releasing, we prevent a lockup of goroutines that have already
+ // acquired the semaphore, but have yet to acquire on it
+ ws.Release(c.max)
+
+ // If we managed to acquire all the locks, we can remove the semaphore for this key
+ delete(c.semaphores, lockKey)
+}
+
+func (c *ConcurrencyLimiter) countSemaphores() int {
+ c.mux.Lock()
+ defer c.mux.Unlock()
+
+ return len(c.semaphores)
+}
+
+// Limit will limit the concurrency of f
+func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) {
+ if c.max <= 0 {
+ return f()
+ }
+
+ start := time.Now()
+ c.monitor.Queued(ctx)
+
+ w := c.getSemaphore(lockKey)
+
+ // Attempt to cleanup the semaphore it's no longer being used
+ defer c.attemptCollection(lockKey)
+
+ err := w.Acquire(ctx, 1)
+ c.monitor.Dequeued(ctx)
+
+ if err != nil {
+ return nil, err
+ }
+
+ c.monitor.Enter(ctx, time.Since(start))
+ defer c.monitor.Exit(ctx)
+
+ defer w.Release(1)
+
+ resp, err := f()
+
+ return resp, err
+}
+
+// NewLimiter creates a new rate limiter
+func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+ if monitor == nil {
+ monitor = &nullConcurrencyMonitor{}
+ }
+
+ return &ConcurrencyLimiter{
+ semaphores: make(map[string]*semaphore.Weighted),
+ max: int64(max),
+ mux: &sync.Mutex{},
+ monitor: monitor,
+ }
+}
+
+type nullConcurrencyMonitor struct{}
+
+func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {}
+func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {}
+func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {}
+func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {}
diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go
new file mode 100644
index 000000000..f045c2c31
--- /dev/null
+++ b/internal/middleware/limithandler/concurrency_limiter_test.go
@@ -0,0 +1,181 @@
+package limithandler
+
+import (
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ "golang.org/x/net/context"
+)
+
+type counter struct {
+ sync.Mutex
+ max int
+ current int
+ queued int
+ dequeued int
+ enter int
+ exit int
+}
+
+func (c *counter) up() {
+ c.Lock()
+ defer c.Unlock()
+
+ c.current = c.current + 1
+ if c.current > c.max {
+ c.max = c.current
+ }
+}
+
+func (c *counter) down() {
+ c.Lock()
+ defer c.Unlock()
+
+ c.current = c.current - 1
+}
+
+func (c *counter) Queued(ctx context.Context) {
+ c.Lock()
+ defer c.Unlock()
+ c.queued++
+}
+
+func (c *counter) Dequeued(ctx context.Context) {
+ c.Lock()
+ defer c.Unlock()
+ c.dequeued++
+}
+
+func (c *counter) Enter(ctx context.Context, acquireTime time.Duration) {
+ c.Lock()
+ defer c.Unlock()
+ c.enter++
+}
+
+func (c *counter) Exit(ctx context.Context) {
+ c.Lock()
+ defer c.Unlock()
+ c.exit++
+}
+
+func TestLimiter(t *testing.T) {
+ tests := []struct {
+ name string
+ concurrency int
+ maxConcurrency int
+ iterations int
+ delay time.Duration
+ buckets int
+ wantMaxRange []int
+ wantMonitorCalls bool
+ }{
+ {
+ name: "single",
+ concurrency: 1,
+ maxConcurrency: 1,
+ iterations: 1,
+ delay: 1 * time.Millisecond,
+ buckets: 1,
+ wantMaxRange: []int{1, 1},
+ wantMonitorCalls: true,
+ },
+ {
+ name: "two-at-a-time",
+ concurrency: 100,
+ maxConcurrency: 2,
+ iterations: 10,
+ delay: 1 * time.Millisecond,
+ buckets: 1,
+ wantMaxRange: []int{2, 3},
+ wantMonitorCalls: true,
+ },
+ {
+ name: "two-by-two",
+ concurrency: 100,
+ maxConcurrency: 2,
+ delay: 1000 * time.Nanosecond,
+ iterations: 4,
+ buckets: 2,
+ wantMaxRange: []int{4, 5},
+ wantMonitorCalls: true,
+ },
+ {
+ name: "no-limit",
+ concurrency: 10,
+ maxConcurrency: 0,
+ iterations: 200,
+ delay: 1000 * time.Nanosecond,
+ buckets: 1,
+ wantMaxRange: []int{10, 10},
+ wantMonitorCalls: false,
+ },
+ {
+ name: "wide-spread",
+ concurrency: 1000,
+ maxConcurrency: 2,
+ delay: 100 * time.Nanosecond,
+ iterations: 40,
+ buckets: 50,
+ // Intentionally leaving the max low because CI runners
+ // may struggle to do 80 things in parallel
+ wantMaxRange: []int{80, 102},
+ wantMonitorCalls: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ gauge := &counter{}
+
+ limiter := NewLimiter(tt.maxConcurrency, gauge)
+ wg := sync.WaitGroup{}
+ wg.Add(tt.concurrency)
+
+ // We know of an edge case that can lead to the rate limiter
+ // occassionally letting one or two extra goroutines run
+ // concurrently.
+ for c := 0; c < tt.concurrency; c++ {
+ go func(counter int) {
+ for i := 0; i < tt.iterations; i++ {
+ lockKey := strconv.Itoa((i ^ counter) % tt.buckets)
+
+ limiter.Limit(context.Background(), lockKey, func() (interface{}, error) {
+ gauge.up()
+
+ assert.True(t, gauge.current <= tt.wantMaxRange[1], "Expected the number of concurrent operations (%v) to not exceed the maximum concurrency (%v)", gauge.current, tt.wantMaxRange[1])
+ assert.True(t, limiter.countSemaphores() <= tt.buckets, "Expected the number of semaphores (%v) to be lte number of buckets (%v)", limiter.countSemaphores(), tt.buckets)
+ time.Sleep(tt.delay)
+
+ gauge.down()
+ return nil, nil
+ })
+
+ time.Sleep(tt.delay)
+ }
+
+ wg.Done()
+ }(c)
+ }
+
+ wg.Wait()
+ assert.True(t, tt.wantMaxRange[0] <= gauge.max && gauge.max <= tt.wantMaxRange[1], "Expected maximum concurrency to be in the range [%v,%v] but got %v", tt.wantMaxRange[0], tt.wantMaxRange[1], gauge.max)
+ assert.Equal(t, 0, gauge.current)
+ assert.Equal(t, 0, limiter.countSemaphores())
+
+ var wantMonitorCallCount int
+ if tt.wantMonitorCalls {
+ wantMonitorCallCount = tt.concurrency * tt.iterations
+ } else {
+ wantMonitorCallCount = 0
+ }
+
+ assert.Equal(t, wantMonitorCallCount, gauge.enter)
+ assert.Equal(t, wantMonitorCallCount, gauge.exit)
+ assert.Equal(t, wantMonitorCallCount, gauge.queued)
+ assert.Equal(t, wantMonitorCallCount, gauge.dequeued)
+ })
+ }
+}
diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go
new file mode 100644
index 000000000..bd924659b
--- /dev/null
+++ b/internal/middleware/limithandler/limithandler.go
@@ -0,0 +1,95 @@
+package limithandler
+
+import (
+ "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+)
+
+// LimiterMiddleware contains rate limiter state
+type LimiterMiddleware struct {
+ methodLimiters map[string]*ConcurrencyLimiter
+}
+
+var maxConcurrencyPerRepoPerRPC map[string]int
+
+func getRepoPath(ctx context.Context) string {
+ tags := grpc_ctxtags.Extract(ctx)
+ ctxValue := tags.Values()["grpc.request.repoPath"]
+ if ctxValue == nil {
+ return ""
+ }
+
+ s, ok := ctxValue.(string)
+ if ok {
+ return s
+ }
+
+ return ""
+}
+
+// UnaryInterceptor returns a Unary Interceptor
+func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor {
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ repoPath := getRepoPath(ctx)
+ if repoPath == "" {
+ return handler(ctx, req)
+ }
+
+ limiter := c.methodLimiters[info.FullMethod]
+ if limiter == nil {
+ // No concurrency limiting
+ return handler(ctx, req)
+ }
+
+ return limiter.Limit(ctx, repoPath, func() (interface{}, error) {
+ return handler(ctx, req)
+ })
+ }
+}
+
+// StreamInterceptor returns a Stream Interceptor
+func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor {
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ ctx := stream.Context()
+
+ repoPath := getRepoPath(ctx)
+ if repoPath == "" {
+ return handler(srv, stream)
+ }
+
+ limiter := c.methodLimiters[info.FullMethod]
+ if limiter == nil {
+ // No concurrency limiting
+ return handler(srv, stream)
+ }
+
+ _, err := limiter.Limit(ctx, repoPath, func() (interface{}, error) {
+ return nil, handler(srv, stream)
+ })
+
+ return err
+ }
+}
+
+// New creates a new rate limiter
+func New() LimiterMiddleware {
+ return LimiterMiddleware{
+ methodLimiters: createLimiterConfig(),
+ }
+}
+
+func createLimiterConfig() map[string]*ConcurrencyLimiter {
+ result := make(map[string]*ConcurrencyLimiter)
+
+ for fullMethodName, max := range maxConcurrencyPerRepoPerRPC {
+ result[fullMethodName] = NewLimiter(max, newPromMonitor(fullMethodName))
+ }
+
+ return result
+}
+
+// SetMaxRepoConcurrency Configures the max concurrency per repo per RPC
+func SetMaxRepoConcurrency(config map[string]int) {
+ maxConcurrencyPerRepoPerRPC = config
+}
diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go
new file mode 100644
index 000000000..bbb9ad533
--- /dev/null
+++ b/internal/middleware/limithandler/metrics.go
@@ -0,0 +1,113 @@
+package limithandler
+
+import (
+ "strings"
+ "time"
+
+ prom "github.com/prometheus/client_golang/prometheus"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
+ "golang.org/x/net/context"
+)
+
+const acquireDurationLogThreshold = 10 * time.Millisecond
+
+var (
+ histogramEnabled = false
+ histogramVec *prom.HistogramVec
+ inprogressGaugeVec = prom.NewGaugeVec(
+ prom.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "in_progress",
+ Help: "Gauge of number of number of concurrent invocations currently in progress for this endpoint",
+ },
+ []string{"grpc_service", "grpc_method"},
+ )
+
+ queuedGaugeVec = prom.NewGaugeVec(
+ prom.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "queued",
+ Help: "Gauge of number of number of invocations currently queued for this endpoint",
+ },
+ []string{"grpc_service", "grpc_method"},
+ )
+)
+
+type promMonitor struct {
+ queuedGauge prom.Gauge
+ inprogressGauge prom.Gauge
+ histogram prom.Histogram
+}
+
+func init() {
+ prom.MustRegister(inprogressGaugeVec, queuedGaugeVec)
+}
+
+func splitMethodName(fullMethodName string) (string, string) {
+ fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
+ if i := strings.Index(fullMethodName, "/"); i >= 0 {
+ return fullMethodName[:i], fullMethodName[i+1:]
+ }
+ return "unknown", "unknown"
+}
+
+// EnableAcquireTimeHistogram enables histograms for acquisition times
+func EnableAcquireTimeHistogram(buckets []float64) {
+ histogramEnabled = true
+ histogramOpts := prom.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "acquiring_seconds",
+ Help: "Histogram of lock acquisition latency (seconds) for endpoint rate limiting",
+ Buckets: buckets,
+ }
+
+ histogramVec = prom.NewHistogramVec(
+ histogramOpts,
+ []string{"grpc_service", "grpc_method"},
+ )
+
+ prom.Register(histogramVec)
+}
+
+func (c *promMonitor) Queued(ctx context.Context) {
+ c.queuedGauge.Inc()
+}
+
+func (c *promMonitor) Dequeued(ctx context.Context) {
+ c.queuedGauge.Dec()
+}
+
+func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
+ c.inprogressGauge.Inc()
+
+ if acquireTime > acquireDurationLogThreshold {
+ logger := grpc_logrus.Extract(ctx)
+ logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait")
+ }
+
+ if c.histogram != nil {
+ c.histogram.Observe(acquireTime.Seconds())
+ }
+}
+
+func (c *promMonitor) Exit(ctx context.Context) {
+ c.inprogressGauge.Dec()
+}
+
+func newPromMonitor(fullMethod string) ConcurrencyMonitor {
+ serviceName, methodName := splitMethodName(fullMethod)
+
+ queuedGauge := queuedGaugeVec.WithLabelValues(serviceName, methodName)
+ inprogressGauge := inprogressGaugeVec.WithLabelValues(serviceName, methodName)
+
+ var histogram prom.Histogram
+ if histogramVec != nil {
+ histogram = histogramVec.WithLabelValues(serviceName, methodName)
+ }
+
+ return &promMonitor{queuedGauge, inprogressGauge, histogram}
+}
diff --git a/internal/server/server.go b/internal/server/server.go
index 4690bb5c3..33d8da3fd 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -5,6 +5,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
"gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
@@ -28,6 +29,8 @@ func New(rubyServer *rubyserver.Server) *grpc.Server {
grpc_ctxtags.WithFieldExtractor(fieldextractors.RepositoryFieldExtractor),
}
+ lh := limithandler.New()
+
server := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
@@ -35,6 +38,7 @@ func New(rubyServer *rubyserver.Server) *grpc.Server {
grpc_logrus.StreamServerInterceptor(logrusEntry),
sentryhandler.StreamLogHandler,
cancelhandler.Stream, // Should be below LogHandler
+ lh.StreamInterceptor(),
authStreamServerInterceptor(),
// Panic handler should remain last so that application panics will be
// converted to errors and logged
@@ -46,6 +50,7 @@ func New(rubyServer *rubyserver.Server) *grpc.Server {
grpc_logrus.UnaryServerInterceptor(logrusEntry),
sentryhandler.UnaryLogHandler,
cancelhandler.Unary, // Should be below LogHandler
+ lh.UnaryInterceptor(),
authUnaryServerInterceptor(),
// Panic handler should remain last so that application panics will be
// converted to errors and logged
diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE
new file mode 100644
index 000000000..6a66aea5e
--- /dev/null
+++ b/vendor/golang.org/x/sync/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) 2009 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS
new file mode 100644
index 000000000..733099041
--- /dev/null
+++ b/vendor/golang.org/x/sync/PATENTS
@@ -0,0 +1,22 @@
+Additional IP Rights Grant (Patents)
+
+"This implementation" means the copyrightable works distributed by
+Google as part of the Go project.
+
+Google hereby grants to You a perpetual, worldwide, non-exclusive,
+no-charge, royalty-free, irrevocable (except as stated in this section)
+patent license to make, have made, use, offer to sell, sell, import,
+transfer and otherwise run, modify and propagate the contents of this
+implementation of Go, where such license applies only to those patent
+claims, both currently owned or controlled by Google and acquired in
+the future, licensable by Google that are necessarily infringed by this
+implementation of Go. This grant does not include claims that would be
+infringed only as a consequence of further modification of this
+implementation. If you or your agent or exclusive licensee institute or
+order or agree to the institution of patent litigation against any
+entity (including a cross-claim or counterclaim in a lawsuit) alleging
+that this implementation of Go or any code incorporated within this
+implementation of Go constitutes direct or contributory patent
+infringement, or inducement of patent infringement, then any patent
+rights granted to you under this License for this implementation of Go
+shall terminate as of the date such litigation is filed.
diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go
new file mode 100644
index 000000000..e9d2d79a9
--- /dev/null
+++ b/vendor/golang.org/x/sync/semaphore/semaphore.go
@@ -0,0 +1,131 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package semaphore provides a weighted semaphore implementation.
+package semaphore // import "golang.org/x/sync/semaphore"
+
+import (
+ "container/list"
+ "sync"
+
+ // Use the old context because packages that depend on this one
+ // (e.g. cloud.google.com/go/...) must run on Go 1.6.
+ // TODO(jba): update to "context" when possible.
+ "golang.org/x/net/context"
+)
+
+type waiter struct {
+ n int64
+ ready chan<- struct{} // Closed when semaphore acquired.
+}
+
+// NewWeighted creates a new weighted semaphore with the given
+// maximum combined weight for concurrent access.
+func NewWeighted(n int64) *Weighted {
+ w := &Weighted{size: n}
+ return w
+}
+
+// Weighted provides a way to bound concurrent access to a resource.
+// The callers can request access with a given weight.
+type Weighted struct {
+ size int64
+ cur int64
+ mu sync.Mutex
+ waiters list.List
+}
+
+// Acquire acquires the semaphore with a weight of n, blocking only until ctx
+// is done. On success, returns nil. On failure, returns ctx.Err() and leaves
+// the semaphore unchanged.
+//
+// If ctx is already done, Acquire may still succeed without blocking.
+func (s *Weighted) Acquire(ctx context.Context, n int64) error {
+ s.mu.Lock()
+ if s.size-s.cur >= n && s.waiters.Len() == 0 {
+ s.cur += n
+ s.mu.Unlock()
+ return nil
+ }
+
+ if n > s.size {
+ // Don't make other Acquire calls block on one that's doomed to fail.
+ s.mu.Unlock()
+ <-ctx.Done()
+ return ctx.Err()
+ }
+
+ ready := make(chan struct{})
+ w := waiter{n: n, ready: ready}
+ elem := s.waiters.PushBack(w)
+ s.mu.Unlock()
+
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ s.mu.Lock()
+ select {
+ case <-ready:
+ // Acquired the semaphore after we were canceled. Rather than trying to
+ // fix up the queue, just pretend we didn't notice the cancelation.
+ err = nil
+ default:
+ s.waiters.Remove(elem)
+ }
+ s.mu.Unlock()
+ return err
+
+ case <-ready:
+ return nil
+ }
+}
+
+// TryAcquire acquires the semaphore with a weight of n without blocking.
+// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
+func (s *Weighted) TryAcquire(n int64) bool {
+ s.mu.Lock()
+ success := s.size-s.cur >= n && s.waiters.Len() == 0
+ if success {
+ s.cur += n
+ }
+ s.mu.Unlock()
+ return success
+}
+
+// Release releases the semaphore with a weight of n.
+func (s *Weighted) Release(n int64) {
+ s.mu.Lock()
+ s.cur -= n
+ if s.cur < 0 {
+ s.mu.Unlock()
+ panic("semaphore: bad release")
+ }
+ for {
+ next := s.waiters.Front()
+ if next == nil {
+ break // No more waiters blocked.
+ }
+
+ w := next.Value.(waiter)
+ if s.size-s.cur < w.n {
+ // Not enough tokens for the next waiter. We could keep going (to try to
+ // find a waiter with a smaller request), but under load that could cause
+ // starvation for large requests; instead, we leave all remaining waiters
+ // blocked.
+ //
+ // Consider a semaphore used as a read-write lock, with N tokens, N
+ // readers, and one writer. Each reader can Acquire(1) to obtain a read
+ // lock. The writer can Acquire(N) to obtain a write lock, excluding all
+ // of the readers. If we allow the readers to jump ahead in the queue,
+ // the writer will starve — there is always one token available for every
+ // reader.
+ break
+ }
+
+ s.cur += w.n
+ s.waiters.Remove(next)
+ close(w.ready)
+ }
+ s.mu.Unlock()
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index 7520e4962..9e4f072b7 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -257,6 +257,12 @@
"revisionTime": "2017-06-03T08:13:02Z"
},
{
+ "checksumSHA1": "vcN67ZjTbGpLLwSghHCbAEvmzMo=",
+ "path": "golang.org/x/sync/semaphore",
+ "revision": "f52d1811a62927559de87708c8913c1650ce4f26",
+ "revisionTime": "2017-05-17T20:25:26Z"
+ },
+ {
"checksumSHA1": "wxrHmKhFznZZAjrYK5/nWn+fZGc=",
"path": "golang.org/x/sys/unix",
"revision": "dbc2be9168a660ef302e04b6ff6406de6f967473",