diff options
Diffstat (limited to 'internal/git/trace2hooks')
-rw-r--r-- | internal/git/trace2hooks/log_exporter.go | 68 | ||||
-rw-r--r-- | internal/git/trace2hooks/log_exporter_test.go | 190 | ||||
-rw-r--r-- | internal/git/trace2hooks/trace_helper_test.go | 101 | ||||
-rw-r--r-- | internal/git/trace2hooks/tracingexporter_test.go | 88 |
4 files changed, 364 insertions, 83 deletions
diff --git a/internal/git/trace2hooks/log_exporter.go b/internal/git/trace2hooks/log_exporter.go new file mode 100644 index 000000000..a714d7fb3 --- /dev/null +++ b/internal/git/trace2hooks/log_exporter.go @@ -0,0 +1,68 @@ +package trace2hooks + +import ( + "context" + "encoding/json" + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/git/trace2" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "golang.org/x/time/rate" +) + +// NewLogExporter initializes LogExporter, which is a hook that uses the parsed +// trace2 events from the manager to export them to the Gitaly log. It's invocations are limited +// by the rateLimiter. The limiter allows maxBurstToken number of events to happen at once and then +// replenishes by maxEventPerSecond. It works on the token bucket algorithm where you have a number +// of tokens in the bucket to start and you can consume them in each call whilst the bucket gets +// refilled at the specified rate. +func NewLogExporter(rl *rate.Limiter, logger log.Logger) *LogExporter { + return &LogExporter{ + rateLimiter: rl, + logger: logger, + } +} + +// LogExporter is a trace2 hook that adds trace2 api event logs to Gitaly's logs. +type LogExporter struct { + logger log.Logger + rateLimiter *rate.Limiter +} + +// Name returns the name of tracing exporter +func (t *LogExporter) Name() string { + return "log_exporter" +} + +// Handle will log the trace in a readable json format in Gitaly's logs. Metadata is also collected +// and additional information is added to the log. It is also rate limited to protect it from overload +// when there are a lot of trace2 events triggered from git operations. +func (t *LogExporter) Handle(rootCtx context.Context, trace *trace2.Trace) error { + if !t.rateLimiter.Allow() { + // When the event is not allowed, return an error to the caller, this may cause traces to be skipped/dropped. + return fmt.Errorf("rate has exceeded current limit") + } + + trace.Walk(rootCtx, func(ctx context.Context, t *trace2.Trace) context.Context { + t.SetMetadata("elapsed_ms", fmt.Sprintf("%d", t.FinishTime.Sub(t.StartTime).Milliseconds())) + + return ctx + }) + + childrenJSON, err := json.Marshal(trace.Children) + if err != nil { + return fmt.Errorf("marshal json: %w", err) + } + escapedChildren := json.RawMessage(childrenJSON) + + t.logger.WithFields(log.Fields{ + "name": trace.Name, + "thread": trace.Thread, + "component": "trace2hooks." + t.Name(), + "start_time": trace.StartTime, + "finish_time": trace.FinishTime, + "metadata": trace.Metadata, + "children": escapedChildren, + }).Info("Git Trace2 API") + return nil +} diff --git a/internal/git/trace2hooks/log_exporter_test.go b/internal/git/trace2hooks/log_exporter_test.go new file mode 100644 index 000000000..6edf998f2 --- /dev/null +++ b/internal/git/trace2hooks/log_exporter_test.go @@ -0,0 +1,190 @@ +package trace2hooks + +import ( + "context" + "encoding/json" + "regexp" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/trace2" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "golang.org/x/time/rate" +) + +func TestLogExporter_Handle(t *testing.T) { + current, err := time.Parse("2006-01-02T15:04:05Z", "2023-01-01T00:00:00Z") + require.NoError(t, err) + endTime := current.Add(7 * time.Second) + exampleTrace := createExampleTrace(current) + + for _, tc := range []struct { + desc string + setup func(*testing.T) (context.Context, *trace2.Trace) + expectedTrace trace2.Trace + }{ + { + desc: "receives trace consisting of root only", + setup: func(t *testing.T) (context.Context, *trace2.Trace) { + ctx := testhelper.Context(t) + return ctx, &trace2.Trace{ + Thread: "main", + Name: "root", + StartTime: current, + FinishTime: endTime, + } + }, + expectedTrace: trace2.Trace{ + Thread: "main", + Name: "root", + StartTime: current, + FinishTime: endTime, + Metadata: map[string]string{"elapsed_ms": "7000"}, + }, + }, + { + desc: "receives a complete trace", + setup: func(t *testing.T) (context.Context, *trace2.Trace) { + ctx := testhelper.Context(t) + return ctx, exampleTrace + }, + expectedTrace: trace2.Trace{ + Thread: "main", + Name: "root", + StartTime: current, + FinishTime: endTime, + Metadata: map[string]string{"elapsed_ms": "7000"}, + Children: []*trace2.Trace{ + { + Thread: "main", + Name: "version", + StartTime: current, + FinishTime: current.Add(1 * time.Second), Metadata: map[string]string{"exe": "2.42.0", "elapsed_ms": "1000"}, + Depth: 1, + }, { + Thread: "main", + Name: "start", + StartTime: current.Add(1 * time.Second), + FinishTime: current.Add(2 * time.Second), + Metadata: map[string]string{"argv": "git fetch origin master", "elapsed_ms": "1000"}, + Depth: 1, + }, { + Thread: "main", + Name: "def_repo", + StartTime: current.Add(2 * time.Second), + FinishTime: current.Add(3 * time.Second), + Metadata: map[string]string{"worktree": "/Users/userx123/Documents/gitlab-development-kit", "elapsed_ms": "1000"}, + Depth: 1, + }, { + Thread: "main", + Name: "index:do_read_index", + StartTime: current.Add(3 * time.Second), + FinishTime: current.Add(6 * time.Second), + Metadata: map[string]string{"elapsed_ms": "3000"}, + Depth: 1, + Children: []*trace2.Trace{ + { + Thread: "main", + ChildID: "0", + Name: "cache_tree:read", + StartTime: current.Add(3 * time.Second), + FinishTime: current.Add(4 * time.Second), + Metadata: map[string]string{"elapsed_ms": "1000"}, + Depth: 2, + }, + { + Thread: "main", + ChildID: "0", + Name: "data:index:read/version", + StartTime: current.Add(4 * time.Second), + FinishTime: current.Add(5 * time.Second), + Metadata: map[string]string{"data": "2", "elapsed_ms": "1000"}, + Depth: 2, + }, + { + Thread: "main", + ChildID: "0", + Name: "data:index:read/cache_nr", + StartTime: current.Add(5 * time.Second), + FinishTime: current.Add(6 * time.Second), + Metadata: map[string]string{ + "elapsed_ms": "1000", + "data": "1500", + }, + Depth: 2, + }, + }, + }, { + Thread: "main", + Name: "submodule:parallel/fetch", + StartTime: current.Add(6 * time.Second), + FinishTime: current.Add(7 * time.Second), + Metadata: map[string]string{"elapsed_ms": "1000"}, + Depth: 1, + }, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, trace := tc.setup(t) + logger := testhelper.NewLogger(t) + hook := testhelper.AddLoggerHook(logger) + exporter := NewLogExporter(rate.NewLimiter(1, 1), logger) + // execute and assertions + err := exporter.Handle(ctx, trace) + require.NoError(t, err) + logEntry := hook.LastEntry() + + assert.Equal(t, tc.expectedTrace.Thread, logEntry.Data["thread"]) + assert.Equal(t, tc.expectedTrace.Name, logEntry.Data["name"]) + assert.Equal(t, tc.expectedTrace.Metadata, logEntry.Data["metadata"]) + assert.Equal(t, tc.expectedTrace.StartTime, logEntry.Data["start_time"]) + assert.Equal(t, tc.expectedTrace.FinishTime, logEntry.Data["finish_time"]) + var children []*trace2.Trace + childErr := json.Unmarshal(logEntry.Data["children"].(json.RawMessage), &children) + require.NoError(t, childErr) + assert.Equal(t, tc.expectedTrace.Children, children) + }) + } +} + +func TestLogExporter_rateLimitFailureMock(t *testing.T) { + errorMsg := "rate has exceeded current limit" + + for _, tc := range []struct { + desc string + setup func(*testing.T) (context.Context, *trace2.Trace) + expectedError *regexp.Regexp + }{ + { + desc: "rate limit exceeded", + setup: func(t *testing.T) (context.Context, *trace2.Trace) { + ctx := testhelper.Context(t) + return ctx, &trace2.Trace{ + Thread: "main", + Name: "root", + StartTime: time.Time{}, + FinishTime: time.Time{}.Add(1 * time.Second), + } + }, + expectedError: regexp.MustCompile(errorMsg), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, trace := tc.setup(t) + logger := testhelper.SharedLogger(t) + + rl := rate.NewLimiter(0, 1) // burst limit of 1, refreshing at 0 rps + + exporter := LogExporter{rateLimiter: rl, logger: logger} + err := exporter.Handle(ctx, trace) + require.NoError(t, err) + err = exporter.Handle(ctx, trace) + require.Error(t, err) + require.Regexp(t, tc.expectedError, err) + }) + } +} diff --git a/internal/git/trace2hooks/trace_helper_test.go b/internal/git/trace2hooks/trace_helper_test.go new file mode 100644 index 000000000..267711990 --- /dev/null +++ b/internal/git/trace2hooks/trace_helper_test.go @@ -0,0 +1,101 @@ +package trace2hooks + +import ( + "time" + + "gitlab.com/gitlab-org/gitaly/v16/internal/git/trace2" +) + +func createExampleTrace(startTime time.Time) *trace2.Trace { + // 0s 1s 2s 3s 4s 5s 6s 7s + // root |---->|---->|---->|---->|---->|---->|---->| + // version |---->| | | | | | | + // start | |---->| | | | | | + // def_repo | | |---->| | | | | + // index:do_read_index | | | |---->|---->|---->| | + // cache_tree:read | | | |---->| | | | + // data:index:read/version | | | | |---->| | | + // data:index:read/cache_nr| | | | | |---->| | + // submodule:parallel/fetch| | | | | | |---->| + return connectChildren(&trace2.Trace{ + Thread: "main", + Name: "root", + StartTime: startTime, + FinishTime: startTime.Add(7 * time.Second), + Depth: 0, + Children: []*trace2.Trace{ + { + Thread: "main", + Name: "version", + StartTime: startTime, + FinishTime: startTime.Add(1 * time.Second), Metadata: map[string]string{"exe": "2.42.0"}, + Depth: 1, + }, + { + Thread: "main", + Name: "start", + StartTime: startTime.Add(1 * time.Second), + FinishTime: startTime.Add(2 * time.Second), + Metadata: map[string]string{"argv": "git fetch origin master"}, + Depth: 1, + }, + { + Thread: "main", + Name: "def_repo", + StartTime: startTime.Add(2 * time.Second), + FinishTime: startTime.Add(3 * time.Second), + Metadata: map[string]string{"worktree": "/Users/userx123/Documents/gitlab-development-kit"}, + Depth: 1, + }, + connectChildren(&trace2.Trace{ + Thread: "main", + Name: "index:do_read_index", + StartTime: startTime.Add(3 * time.Second), + FinishTime: startTime.Add(6 * time.Second), + Depth: 1, + Children: []*trace2.Trace{ + { + Thread: "main", + Name: "cache_tree:read", + ChildID: "0", + StartTime: startTime.Add(3 * time.Second), + FinishTime: startTime.Add(4 * time.Second), + Depth: 2, + }, + { + Thread: "main", + ChildID: "0", + Name: "data:index:read/version", + StartTime: startTime.Add(4 * time.Second), + FinishTime: startTime.Add(5 * time.Second), + Metadata: map[string]string{"data": "2"}, + Depth: 2, + }, + { + Thread: "main", + ChildID: "0", + Name: "data:index:read/cache_nr", + StartTime: startTime.Add(5 * time.Second), + FinishTime: startTime.Add(6 * time.Second), + Metadata: map[string]string{"data": "1500"}, + Depth: 2, + }, + }, + }), + { + Thread: "main", + Name: "submodule:parallel/fetch", + StartTime: startTime.Add(6 * time.Second), + FinishTime: startTime.Add(7 * time.Second), + Depth: 1, + }, + }, + }) +} + +func connectChildren(trace *trace2.Trace) *trace2.Trace { + for _, t := range trace.Children { + t.Parent = trace + } + return trace +} diff --git a/internal/git/trace2hooks/tracingexporter_test.go b/internal/git/trace2hooks/tracingexporter_test.go index 062f98a96..0c7ab5a91 100644 --- a/internal/git/trace2hooks/tracingexporter_test.go +++ b/internal/git/trace2hooks/tracingexporter_test.go @@ -20,80 +20,7 @@ func TestTracingExporter_Handle(t *testing.T) { current, err := time.Parse("2006-01-02T15:04:05Z", "2023-01-01T00:00:00Z") require.NoError(t, err) - // 0s 1s 2s 3s 4s 5s 6s 7s - // root |---->|---->|---->|---->|---->|---->|---->| - // version |---->| | | | | | | - // start | |---->| | | | | | - // def_repo | | |---->| | | | | - // index:do_read_index | | | |---->|---->|---->| | - // cache_tree:read | | | |---->| | | | - // data:index:read/version | | | | |---->| | | - // data:index:read/cache_nr| | | | | |---->| | - // submodule:parallel/fetch| | | | | | |---->| - exampleTrace := connectChildren(&trace2.Trace{ - Thread: "main", - Name: "root", - StartTime: current, - FinishTime: current.Add(7 * time.Second), - Children: []*trace2.Trace{ - { - Thread: "main", - Name: "version", - StartTime: current, - FinishTime: current.Add(1 * time.Second), - }, - { - Thread: "main", - Name: "start", - StartTime: current.Add(1 * time.Second), - FinishTime: current.Add(2 * time.Second), - Metadata: map[string]string{"argv": "git fetch origin master"}, - }, - { - Thread: "main", - Name: "def_repo", - StartTime: current.Add(2 * time.Second), - FinishTime: current.Add(3 * time.Second), - }, - connectChildren(&trace2.Trace{ - Thread: "main", - Name: "index:do_read_index", - StartTime: current.Add(3 * time.Second), - FinishTime: current.Add(6 * time.Second), - Children: []*trace2.Trace{ - { - Thread: "main", - Name: "cache_tree:read", - ChildID: "0", - StartTime: current.Add(3 * time.Second), - FinishTime: current.Add(4 * time.Second), - }, - { - Thread: "main", - ChildID: "0", - Name: "data:index:read/version", - StartTime: current.Add(4 * time.Second), - FinishTime: current.Add(5 * time.Second), - Metadata: map[string]string{"data": "2"}, - }, - { - Thread: "main", - ChildID: "0", - Name: "data:index:read/cache_nr", - StartTime: current.Add(5 * time.Second), - FinishTime: current.Add(6 * time.Second), - Metadata: map[string]string{"data": "1500"}, - }, - }, - }), - { - Thread: "main", - Name: "submodule:parallel/fetch", - StartTime: current.Add(6 * time.Second), - FinishTime: current.Add(7 * time.Second), - }, - }, - }) + exampleTrace := createExampleTrace(current) for _, tc := range []struct { desc string @@ -136,6 +63,7 @@ func TestTracingExporter_Handle(t *testing.T) { Tags: map[string]string{ "childID": "", "thread": "main", + "exe": "2.42.0", }, }, { @@ -153,8 +81,9 @@ func TestTracingExporter_Handle(t *testing.T) { StartTime: current.Add(2 * time.Second), Duration: 1 * time.Second, Tags: map[string]string{ - "childID": "", - "thread": "main", + "childID": "", + "thread": "main", + "worktree": "/Users/userx123/Documents/gitlab-development-kit", }, }, { @@ -227,10 +156,3 @@ func TestTracingExporter_Handle(t *testing.T) { }) } } - -func connectChildren(trace *trace2.Trace) *trace2.Trace { - for _, t := range trace.Children { - t.Parent = trace - } - return trace -} |