Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-06-02 17:46:48 +0300
committerJohn Cai <jcai@gitlab.com>2022-07-13 19:00:04 +0300
commit0f8ab08a73426a0181c6ab6eb6b8641878a822e6 (patch)
tree5959bc397c7aada0821a7d1620ed6d402d5813cc
parente4a8d5e24986a1f3f8c113d6f2d4e14d2d21b952 (diff)
hook: Add concurrency tracker
Add a concurrency tracker whose job it is to keep track of concurrent calls based on some key type and key. This will be used for example, to track the number of concurrent calls to pack objects based on user_id as well as repository. The LogConcurrency function returns a cleanup function, which should get called when the caller finishes its process. When the concurrent processes for a certain key is at 0, the element will get removed from the map to save memory. There are also two metrics that will give us increased insight to the traffic patterns. The first metric gives us the total active callers broken up by segment (repository, user_id, or even something else in the future). This answers the question "how many unique ___"(user_id, repository) have active pack object processes? The second metric gives us a histogram of concurrent pack object processes so we can answer the question "what's the p99 of concurrent pack object processes for users? repositories?"
-rw-r--r--internal/gitaly/hook/concurrency_tracker.go116
-rw-r--r--internal/gitaly/hook/concurrency_tracker_test.go280
2 files changed, 396 insertions, 0 deletions
diff --git a/internal/gitaly/hook/concurrency_tracker.go b/internal/gitaly/hook/concurrency_tracker.go
new file mode 100644
index 000000000..54d8ad190
--- /dev/null
+++ b/internal/gitaly/hook/concurrency_tracker.go
@@ -0,0 +1,116 @@
+package hook
+
+import (
+ "context"
+ "sync"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+// NewConcurrencyTracker creates a new ConcurrencyTracker.
+func NewConcurrencyTracker() *ConcurrencyTracker {
+ c := &ConcurrencyTracker{
+ concurrencyMap: make(map[string]int),
+ currentCallersVec: prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "gitaly_pack_objects_process_active_callers",
+ Help: "Number of unique callers that have an active pack objects processes",
+ },
+ []string{"segment"},
+ ),
+ totalCallersVec: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_pack_objects_process_active_callers_total",
+ Help: "Total unique callers that have initiated a pack objects processes",
+ },
+ []string{"segment"},
+ ),
+ concurrentProcessesVec: prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Name: "gitaly_pack_objects_concurrent_processes",
+ Help: "Number of concurrent processes",
+ Buckets: prometheus.LinearBuckets(0, 5, 20),
+ },
+ []string{"segment"},
+ ),
+ }
+
+ return c
+}
+
+// ConcurrencyTracker tracks concurrency of pack object calls
+type ConcurrencyTracker struct {
+ lock sync.Mutex
+ concurrencyMap map[string]int
+ currentCallersVec *prometheus.GaugeVec
+ totalCallersVec *prometheus.CounterVec
+ concurrentProcessesVec *prometheus.HistogramVec
+}
+
+func (c *ConcurrencyTracker) addConcurrencyDelta(compositeKey string, delta int) int {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ c.concurrencyMap[compositeKey] += delta
+
+ if c.concurrencyMap[compositeKey] == 0 {
+ delete(c.concurrencyMap, compositeKey)
+ }
+
+ return c.concurrencyMap[compositeKey]
+}
+
+// LogConcurrency logs the number of concurrent calls for a keyType, key
+// combination
+func (c *ConcurrencyTracker) LogConcurrency(ctx context.Context, keyType, key string) func() {
+ compositeKey := keyType + ":" + key
+
+ concurrency := c.addConcurrencyDelta(compositeKey, 1)
+ if concurrency == 1 {
+ // If there are no entries in the map for this keyType, key
+ // combination, it means this is the first pack objects process
+ // for this keyType, key. Hence, we increment the total number
+ // of active callers for this keyType as this is a newly active
+ // caller of a pack object process. In finish, when the pack
+ // object process finishes, we check if it is the only active
+ // call and if so, we decrement this metric since that means the
+ // keyType, key caller is no longer responsible for any pack
+ c.currentCallersVec.WithLabelValues(keyType).Inc()
+ c.totalCallersVec.WithLabelValues(keyType).Inc()
+ }
+
+ c.concurrentProcessesVec.WithLabelValues(
+ keyType,
+ ).Observe(float64(concurrency))
+
+ ctxlogrus.Extract(ctx).
+ WithField("concurrency_type", keyType).
+ WithField("concurrency_key", key).
+ WithField("concurrency", concurrency).
+ Info("concurrency")
+
+ return func() {
+ c.finish(keyType, compositeKey)
+ }
+}
+
+func (c *ConcurrencyTracker) finish(keyType, compositeKey string) {
+ if c.addConcurrencyDelta(compositeKey, -1) == 0 {
+ c.currentCallersVec.WithLabelValues(keyType).Dec()
+ }
+}
+
+// Collect allows ConcurrencyTracker to adhere to the prometheus.Collector
+// interface for collecing metrics.
+func (c *ConcurrencyTracker) Collect(ch chan<- prometheus.Metric) {
+ c.currentCallersVec.Collect(ch)
+ c.totalCallersVec.Collect(ch)
+ c.concurrentProcessesVec.Collect(ch)
+}
+
+// Describe allows ConcurrencyTracker to adhere to the prometheus.Collector
+// interface for collecing metrics
+func (c *ConcurrencyTracker) Describe(ch chan<- *prometheus.Desc) {
+ prometheus.DescribeByCollect(c, ch)
+}
diff --git a/internal/gitaly/hook/concurrency_tracker_test.go b/internal/gitaly/hook/concurrency_tracker_test.go
new file mode 100644
index 000000000..77f74ac58
--- /dev/null
+++ b/internal/gitaly/hook/concurrency_tracker_test.go
@@ -0,0 +1,280 @@
+package hook
+
+import (
+ "bytes"
+ "context"
+ "sync"
+ "testing"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+)
+
+func TestConcurrencyTracker(t *testing.T) {
+ testCases := []struct {
+ desc string
+ calls func(ctx context.Context, c *ConcurrencyTracker)
+ expectedLogData []logrus.Fields
+ }{
+ {
+ desc: "single call",
+ calls: func(ctx context.Context, c *ConcurrencyTracker) {
+ finish := c.LogConcurrency(ctx, "repository", "a/b/c")
+ defer finish()
+ finish = c.LogConcurrency(ctx, "user_id", "user-123")
+ defer finish()
+ },
+ expectedLogData: []logrus.Fields{
+ {
+ "concurrency_type": "repository",
+ "concurrency_key": "a/b/c",
+ "concurrency": 1,
+ },
+ {
+ "concurrency_type": "user_id",
+ "concurrency_key": "user-123",
+ "concurrency": 1,
+ },
+ },
+ },
+ {
+ desc: "multiple calls",
+ calls: func(ctx context.Context, c *ConcurrencyTracker) {
+ finish := c.LogConcurrency(ctx, "repository", "a/b/c")
+ defer finish()
+ finish = c.LogConcurrency(ctx, "repository", "a/b/c")
+ defer finish()
+ finish = c.LogConcurrency(ctx, "repository", "a/b/c")
+ defer finish()
+ finish = c.LogConcurrency(ctx, "user_id", "user-123")
+ defer finish()
+ finish = c.LogConcurrency(ctx, "user_id", "user-123")
+ defer finish()
+ finish = c.LogConcurrency(ctx, "user_id", "user-123")
+ defer finish()
+ },
+ expectedLogData: []logrus.Fields{
+ {
+ "concurrency_type": "repository",
+ "concurrency_key": "a/b/c",
+ "concurrency": 1,
+ },
+ {
+ "concurrency_type": "repository",
+ "concurrency_key": "a/b/c",
+ "concurrency": 2,
+ },
+ {
+ "concurrency_type": "repository",
+ "concurrency_key": "a/b/c",
+ "concurrency": 3,
+ },
+ {
+ "concurrency_type": "user_id",
+ "concurrency_key": "user-123",
+ "concurrency": 1,
+ },
+ {
+ "concurrency_type": "user_id",
+ "concurrency_key": "user-123",
+ "concurrency": 2,
+ },
+ {
+ "concurrency_type": "user_id",
+ "concurrency_key": "user-123",
+ "concurrency": 3,
+ },
+ },
+ },
+ {
+ desc: "multiple finished calls",
+ calls: func(ctx context.Context, c *ConcurrencyTracker) {
+ finish := c.LogConcurrency(ctx, "repository", "a/b/c")
+ finish()
+ finish = c.LogConcurrency(ctx, "repository", "a/b/c")
+ finish()
+ finish = c.LogConcurrency(ctx, "repository", "a/b/c")
+ finish()
+ finish = c.LogConcurrency(ctx, "user_id", "user-123")
+ finish()
+ finish = c.LogConcurrency(ctx, "user_id", "user-123")
+ finish()
+ finish = c.LogConcurrency(ctx, "user_id", "user-123")
+ finish()
+ },
+ expectedLogData: []logrus.Fields{
+ {
+ "concurrency_type": "repository",
+ "concurrency_key": "a/b/c",
+ "concurrency": 1,
+ },
+ {
+ "concurrency_type": "repository",
+ "concurrency_key": "a/b/c",
+ "concurrency": 1,
+ },
+ {
+ "concurrency_type": "repository",
+ "concurrency_key": "a/b/c",
+ "concurrency": 1,
+ },
+ {
+ "concurrency_type": "user_id",
+ "concurrency_key": "user-123",
+ "concurrency": 1,
+ },
+ {
+ "concurrency_type": "user_id",
+ "concurrency_key": "user-123",
+ "concurrency": 1,
+ },
+ {
+ "concurrency_type": "user_id",
+ "concurrency_key": "user-123",
+ "concurrency": 1,
+ },
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ l, hook := test.NewNullLogger()
+
+ ctx = ctxlogrus.ToContext(ctx, logrus.NewEntry(l))
+
+ c := NewConcurrencyTracker()
+
+ tc.calls(ctx, c)
+
+ require.Len(t, hook.Entries, len(tc.expectedLogData))
+ for i := 0; i < len(hook.Entries); i++ {
+ assert.Equal(t, tc.expectedLogData[i], hook.Entries[i].Data)
+ assert.Equal(t, "concurrency", hook.Entries[i].Message)
+ }
+
+ assert.Len(t, c.concurrencyMap, 0)
+ })
+ }
+}
+
+func TestConcurrencyTrackerConcurrentCalls(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ l, hook := test.NewNullLogger()
+
+ ctx = ctxlogrus.ToContext(ctx, logrus.NewEntry(l))
+
+ c := NewConcurrencyTracker()
+
+ var wg sync.WaitGroup
+ wg.Add(3)
+
+ for i := 0; i < 3; i++ {
+ go func() {
+ defer wg.Done()
+ finish := c.LogConcurrency(ctx, "repository", "a/b/c")
+ defer finish()
+ }()
+ }
+
+ wg.Wait()
+
+ require.Len(t, hook.Entries, 3)
+
+ for i := 0; i < len(hook.Entries); i++ {
+ assert.Equal(t, "a/b/c", hook.Entries[i].Data["concurrency_key"])
+ assert.Equal(t, "repository", hook.Entries[i].Data["concurrency_type"])
+ assert.Equal(t, "concurrency", hook.Entries[i].Message)
+ }
+
+ assert.Len(t, c.concurrencyMap, 0)
+}
+
+func TestConcurrencyTracker_metrics(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ c := NewConcurrencyTracker()
+
+ finish := c.LogConcurrency(ctx, "repository", "a")
+ finish()
+ c.LogConcurrency(ctx, "repository", "a")
+ c.LogConcurrency(ctx, "repository", "b")
+ c.LogConcurrency(ctx, "repository", "c")
+
+ finish = c.LogConcurrency(ctx, "user_id", "user-1")
+ finish()
+ c.LogConcurrency(ctx, "user_id", "user-1")
+ c.LogConcurrency(ctx, "user_id", "user-2")
+ c.LogConcurrency(ctx, "user_id", "user-3")
+ c.LogConcurrency(ctx, "user_id", "user-4")
+
+ expectedMetrics := `# HELP gitaly_pack_objects_concurrent_processes Number of concurrent processes
+# TYPE gitaly_pack_objects_concurrent_processes histogram
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="0"} 0
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="5"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="10"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="15"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="20"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="25"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="30"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="35"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="40"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="45"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="50"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="55"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="60"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="65"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="70"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="75"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="80"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="85"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="90"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="95"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="+Inf"} 4
+gitaly_pack_objects_concurrent_processes_sum{segment="repository"} 4
+gitaly_pack_objects_concurrent_processes_count{segment="repository"} 4
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="0"} 0
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="5"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="10"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="15"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="20"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="25"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="30"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="35"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="40"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="45"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="50"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="55"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="60"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="65"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="70"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="75"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="80"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="85"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="90"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="95"} 5
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="+Inf"} 5
+gitaly_pack_objects_concurrent_processes_sum{segment="user_id"} 5
+gitaly_pack_objects_concurrent_processes_count{segment="user_id"} 5
+# HELP gitaly_pack_objects_process_active_callers Number of unique callers that have an active pack objects processes
+# TYPE gitaly_pack_objects_process_active_callers gauge
+gitaly_pack_objects_process_active_callers{segment="repository"} 3
+gitaly_pack_objects_process_active_callers{segment="user_id"} 4
+# HELP gitaly_pack_objects_process_active_callers_total Total unique callers that have initiated a pack objects processes
+# TYPE gitaly_pack_objects_process_active_callers_total counter
+gitaly_pack_objects_process_active_callers_total{segment="repository"} 4
+gitaly_pack_objects_process_active_callers_total{segment="user_id"} 5
+`
+ require.NoError(t, testutil.CollectAndCompare(
+ c,
+ bytes.NewBufferString(expectedMetrics),
+ ))
+}