diff options
author | John Cai <jcai@gitlab.com> | 2022-06-02 17:46:48 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-07-13 19:00:04 +0300 |
commit | 0f8ab08a73426a0181c6ab6eb6b8641878a822e6 (patch) | |
tree | 5959bc397c7aada0821a7d1620ed6d402d5813cc | |
parent | e4a8d5e24986a1f3f8c113d6f2d4e14d2d21b952 (diff) |
hook: Add concurrency tracker
Add a concurrency tracker whose job it is to keep track of concurrent
calls based on some key type and key. This will be used for example, to
track the number of concurrent calls to pack objects based on user_id as
well as repository.
The LogConcurrency function returns a cleanup function,
which should get called when the caller finishes its process. When the
concurrent processes for a certain key is at 0, the element will get
removed from the map to save memory.
There are also two metrics that will give us increased insight to the
traffic patterns. The first metric gives us the total active callers
broken up by segment (repository, user_id, or even something else in the
future). This answers the question "how many unique ___"(user_id,
repository) have active pack object processes?
The second metric gives us a histogram of concurrent pack object
processes so we can answer the question "what's the p99 of concurrent
pack object processes for users? repositories?"
-rw-r--r-- | internal/gitaly/hook/concurrency_tracker.go | 116 | ||||
-rw-r--r-- | internal/gitaly/hook/concurrency_tracker_test.go | 280 |
2 files changed, 396 insertions, 0 deletions
diff --git a/internal/gitaly/hook/concurrency_tracker.go b/internal/gitaly/hook/concurrency_tracker.go new file mode 100644 index 000000000..54d8ad190 --- /dev/null +++ b/internal/gitaly/hook/concurrency_tracker.go @@ -0,0 +1,116 @@ +package hook + +import ( + "context" + "sync" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/prometheus/client_golang/prometheus" +) + +// NewConcurrencyTracker creates a new ConcurrencyTracker. +func NewConcurrencyTracker() *ConcurrencyTracker { + c := &ConcurrencyTracker{ + concurrencyMap: make(map[string]int), + currentCallersVec: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gitaly_pack_objects_process_active_callers", + Help: "Number of unique callers that have an active pack objects processes", + }, + []string{"segment"}, + ), + totalCallersVec: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_pack_objects_process_active_callers_total", + Help: "Total unique callers that have initiated a pack objects processes", + }, + []string{"segment"}, + ), + concurrentProcessesVec: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gitaly_pack_objects_concurrent_processes", + Help: "Number of concurrent processes", + Buckets: prometheus.LinearBuckets(0, 5, 20), + }, + []string{"segment"}, + ), + } + + return c +} + +// ConcurrencyTracker tracks concurrency of pack object calls +type ConcurrencyTracker struct { + lock sync.Mutex + concurrencyMap map[string]int + currentCallersVec *prometheus.GaugeVec + totalCallersVec *prometheus.CounterVec + concurrentProcessesVec *prometheus.HistogramVec +} + +func (c *ConcurrencyTracker) addConcurrencyDelta(compositeKey string, delta int) int { + c.lock.Lock() + defer c.lock.Unlock() + + c.concurrencyMap[compositeKey] += delta + + if c.concurrencyMap[compositeKey] == 0 { + delete(c.concurrencyMap, compositeKey) + } + + return c.concurrencyMap[compositeKey] +} + +// LogConcurrency logs the number of concurrent calls for a keyType, key +// combination +func (c *ConcurrencyTracker) LogConcurrency(ctx context.Context, keyType, key string) func() { + compositeKey := keyType + ":" + key + + concurrency := c.addConcurrencyDelta(compositeKey, 1) + if concurrency == 1 { + // If there are no entries in the map for this keyType, key + // combination, it means this is the first pack objects process + // for this keyType, key. Hence, we increment the total number + // of active callers for this keyType as this is a newly active + // caller of a pack object process. In finish, when the pack + // object process finishes, we check if it is the only active + // call and if so, we decrement this metric since that means the + // keyType, key caller is no longer responsible for any pack + c.currentCallersVec.WithLabelValues(keyType).Inc() + c.totalCallersVec.WithLabelValues(keyType).Inc() + } + + c.concurrentProcessesVec.WithLabelValues( + keyType, + ).Observe(float64(concurrency)) + + ctxlogrus.Extract(ctx). + WithField("concurrency_type", keyType). + WithField("concurrency_key", key). + WithField("concurrency", concurrency). + Info("concurrency") + + return func() { + c.finish(keyType, compositeKey) + } +} + +func (c *ConcurrencyTracker) finish(keyType, compositeKey string) { + if c.addConcurrencyDelta(compositeKey, -1) == 0 { + c.currentCallersVec.WithLabelValues(keyType).Dec() + } +} + +// Collect allows ConcurrencyTracker to adhere to the prometheus.Collector +// interface for collecing metrics. +func (c *ConcurrencyTracker) Collect(ch chan<- prometheus.Metric) { + c.currentCallersVec.Collect(ch) + c.totalCallersVec.Collect(ch) + c.concurrentProcessesVec.Collect(ch) +} + +// Describe allows ConcurrencyTracker to adhere to the prometheus.Collector +// interface for collecing metrics +func (c *ConcurrencyTracker) Describe(ch chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(c, ch) +} diff --git a/internal/gitaly/hook/concurrency_tracker_test.go b/internal/gitaly/hook/concurrency_tracker_test.go new file mode 100644 index 000000000..77f74ac58 --- /dev/null +++ b/internal/gitaly/hook/concurrency_tracker_test.go @@ -0,0 +1,280 @@ +package hook + +import ( + "bytes" + "context" + "sync" + "testing" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" +) + +func TestConcurrencyTracker(t *testing.T) { + testCases := []struct { + desc string + calls func(ctx context.Context, c *ConcurrencyTracker) + expectedLogData []logrus.Fields + }{ + { + desc: "single call", + calls: func(ctx context.Context, c *ConcurrencyTracker) { + finish := c.LogConcurrency(ctx, "repository", "a/b/c") + defer finish() + finish = c.LogConcurrency(ctx, "user_id", "user-123") + defer finish() + }, + expectedLogData: []logrus.Fields{ + { + "concurrency_type": "repository", + "concurrency_key": "a/b/c", + "concurrency": 1, + }, + { + "concurrency_type": "user_id", + "concurrency_key": "user-123", + "concurrency": 1, + }, + }, + }, + { + desc: "multiple calls", + calls: func(ctx context.Context, c *ConcurrencyTracker) { + finish := c.LogConcurrency(ctx, "repository", "a/b/c") + defer finish() + finish = c.LogConcurrency(ctx, "repository", "a/b/c") + defer finish() + finish = c.LogConcurrency(ctx, "repository", "a/b/c") + defer finish() + finish = c.LogConcurrency(ctx, "user_id", "user-123") + defer finish() + finish = c.LogConcurrency(ctx, "user_id", "user-123") + defer finish() + finish = c.LogConcurrency(ctx, "user_id", "user-123") + defer finish() + }, + expectedLogData: []logrus.Fields{ + { + "concurrency_type": "repository", + "concurrency_key": "a/b/c", + "concurrency": 1, + }, + { + "concurrency_type": "repository", + "concurrency_key": "a/b/c", + "concurrency": 2, + }, + { + "concurrency_type": "repository", + "concurrency_key": "a/b/c", + "concurrency": 3, + }, + { + "concurrency_type": "user_id", + "concurrency_key": "user-123", + "concurrency": 1, + }, + { + "concurrency_type": "user_id", + "concurrency_key": "user-123", + "concurrency": 2, + }, + { + "concurrency_type": "user_id", + "concurrency_key": "user-123", + "concurrency": 3, + }, + }, + }, + { + desc: "multiple finished calls", + calls: func(ctx context.Context, c *ConcurrencyTracker) { + finish := c.LogConcurrency(ctx, "repository", "a/b/c") + finish() + finish = c.LogConcurrency(ctx, "repository", "a/b/c") + finish() + finish = c.LogConcurrency(ctx, "repository", "a/b/c") + finish() + finish = c.LogConcurrency(ctx, "user_id", "user-123") + finish() + finish = c.LogConcurrency(ctx, "user_id", "user-123") + finish() + finish = c.LogConcurrency(ctx, "user_id", "user-123") + finish() + }, + expectedLogData: []logrus.Fields{ + { + "concurrency_type": "repository", + "concurrency_key": "a/b/c", + "concurrency": 1, + }, + { + "concurrency_type": "repository", + "concurrency_key": "a/b/c", + "concurrency": 1, + }, + { + "concurrency_type": "repository", + "concurrency_key": "a/b/c", + "concurrency": 1, + }, + { + "concurrency_type": "user_id", + "concurrency_key": "user-123", + "concurrency": 1, + }, + { + "concurrency_type": "user_id", + "concurrency_key": "user-123", + "concurrency": 1, + }, + { + "concurrency_type": "user_id", + "concurrency_key": "user-123", + "concurrency": 1, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + ctx := testhelper.Context(t) + + l, hook := test.NewNullLogger() + + ctx = ctxlogrus.ToContext(ctx, logrus.NewEntry(l)) + + c := NewConcurrencyTracker() + + tc.calls(ctx, c) + + require.Len(t, hook.Entries, len(tc.expectedLogData)) + for i := 0; i < len(hook.Entries); i++ { + assert.Equal(t, tc.expectedLogData[i], hook.Entries[i].Data) + assert.Equal(t, "concurrency", hook.Entries[i].Message) + } + + assert.Len(t, c.concurrencyMap, 0) + }) + } +} + +func TestConcurrencyTrackerConcurrentCalls(t *testing.T) { + ctx := testhelper.Context(t) + + l, hook := test.NewNullLogger() + + ctx = ctxlogrus.ToContext(ctx, logrus.NewEntry(l)) + + c := NewConcurrencyTracker() + + var wg sync.WaitGroup + wg.Add(3) + + for i := 0; i < 3; i++ { + go func() { + defer wg.Done() + finish := c.LogConcurrency(ctx, "repository", "a/b/c") + defer finish() + }() + } + + wg.Wait() + + require.Len(t, hook.Entries, 3) + + for i := 0; i < len(hook.Entries); i++ { + assert.Equal(t, "a/b/c", hook.Entries[i].Data["concurrency_key"]) + assert.Equal(t, "repository", hook.Entries[i].Data["concurrency_type"]) + assert.Equal(t, "concurrency", hook.Entries[i].Message) + } + + assert.Len(t, c.concurrencyMap, 0) +} + +func TestConcurrencyTracker_metrics(t *testing.T) { + ctx := testhelper.Context(t) + + c := NewConcurrencyTracker() + + finish := c.LogConcurrency(ctx, "repository", "a") + finish() + c.LogConcurrency(ctx, "repository", "a") + c.LogConcurrency(ctx, "repository", "b") + c.LogConcurrency(ctx, "repository", "c") + + finish = c.LogConcurrency(ctx, "user_id", "user-1") + finish() + c.LogConcurrency(ctx, "user_id", "user-1") + c.LogConcurrency(ctx, "user_id", "user-2") + c.LogConcurrency(ctx, "user_id", "user-3") + c.LogConcurrency(ctx, "user_id", "user-4") + + expectedMetrics := `# HELP gitaly_pack_objects_concurrent_processes Number of concurrent processes +# TYPE gitaly_pack_objects_concurrent_processes histogram +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="0"} 0 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="5"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="10"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="15"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="20"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="25"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="30"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="35"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="40"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="45"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="50"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="55"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="60"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="65"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="70"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="75"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="80"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="85"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="90"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="95"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="+Inf"} 4 +gitaly_pack_objects_concurrent_processes_sum{segment="repository"} 4 +gitaly_pack_objects_concurrent_processes_count{segment="repository"} 4 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="0"} 0 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="5"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="10"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="15"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="20"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="25"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="30"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="35"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="40"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="45"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="50"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="55"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="60"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="65"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="70"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="75"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="80"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="85"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="90"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="95"} 5 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="+Inf"} 5 +gitaly_pack_objects_concurrent_processes_sum{segment="user_id"} 5 +gitaly_pack_objects_concurrent_processes_count{segment="user_id"} 5 +# HELP gitaly_pack_objects_process_active_callers Number of unique callers that have an active pack objects processes +# TYPE gitaly_pack_objects_process_active_callers gauge +gitaly_pack_objects_process_active_callers{segment="repository"} 3 +gitaly_pack_objects_process_active_callers{segment="user_id"} 4 +# HELP gitaly_pack_objects_process_active_callers_total Total unique callers that have initiated a pack objects processes +# TYPE gitaly_pack_objects_process_active_callers_total counter +gitaly_pack_objects_process_active_callers_total{segment="repository"} 4 +gitaly_pack_objects_process_active_callers_total{segment="user_id"} 5 +` + require.NoError(t, testutil.CollectAndCompare( + c, + bytes.NewBufferString(expectedMetrics), + )) +} |