diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-08-18 18:40:35 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-08-18 18:40:35 +0300 |
commit | 397a8aa41c8b1b159a667fb262aebc644719e074 (patch) | |
tree | cde52229649d0546bb09c49edcf196f18ac1be85 | |
parent | 8302f636d0a3f1b83cb7e5420b2720e83e564306 (diff) | |
parent | cf1e56867ba3158861460ba0ea45e6d60ac89852 (diff) |
Merge branch 'command-stats' into 'master'
Log cumulative per-request rusage ("command stats")
See merge request gitlab-org/gitaly!2368
-rw-r--r-- | changelogs/unreleased/command-stats.yml | 5 | ||||
-rw-r--r-- | internal/command/command.go | 21 | ||||
-rw-r--r-- | internal/command/stats.go | 61 | ||||
-rw-r--r-- | internal/command/stats_test.go | 77 | ||||
-rw-r--r-- | internal/metadata/featureflag/feature_flags.go | 2 | ||||
-rw-r--r-- | internal/middleware/commandstatshandler/commandstatshandler.go | 38 | ||||
-rw-r--r-- | internal/middleware/commandstatshandler/commandstatshandler_test.go | 121 | ||||
-rw-r--r-- | internal/server/server.go | 3 |
8 files changed, 327 insertions, 1 deletions
diff --git a/changelogs/unreleased/command-stats.yml b/changelogs/unreleased/command-stats.yml new file mode 100644 index 000000000..f126bfe2f --- /dev/null +++ b/changelogs/unreleased/command-stats.yml @@ -0,0 +1,5 @@ +--- +title: Log cumulative per-request rusage ("command stats") +merge_request: 2368 +author: +type: performance diff --git a/internal/command/command.go b/internal/command/command.go index f54b1694b..cd7ec7b83 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -17,6 +17,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" ) const ( @@ -405,7 +406,8 @@ func (c *Command) logProcessComplete(ctx context.Context, exitCode int) { "command.real_time_ms": realTime.Seconds() * 1000, }) - if rusage, ok := cmd.ProcessState.SysUsage().(*syscall.Rusage); ok { + rusage, ok := cmd.ProcessState.SysUsage().(*syscall.Rusage) + if ok { entry = entry.WithFields(logrus.Fields{ "command.maxrss": rusage.Maxrss, "command.inblock": rusage.Inblock, @@ -413,6 +415,23 @@ func (c *Command) logProcessComplete(ctx context.Context, exitCode int) { }) } + if featureflag.IsEnabled(ctx, featureflag.LogCommandStats) { + stats := StatsFromContext(ctx) + + stats.RecordSum("command.count", 1) + stats.RecordSum("command.system_time_ms", int(systemTime.Seconds()*1000)) + stats.RecordSum("command.user_time_ms", int(userTime.Seconds()*1000)) + stats.RecordSum("command.real_time_ms", int(realTime.Seconds()*1000)) + + if ok { + stats.RecordMax("command.maxrss", int(rusage.Maxrss)) + stats.RecordSum("command.inblock", int(rusage.Inblock)) + stats.RecordSum("command.oublock", int(rusage.Oublock)) + stats.RecordSum("command.minflt", int(rusage.Minflt)) + stats.RecordSum("command.majflt", int(rusage.Majflt)) + } + } + entry.Debug("spawn complete") } diff --git a/internal/command/stats.go b/internal/command/stats.go new file mode 100644 index 000000000..89b926209 --- /dev/null +++ b/internal/command/stats.go @@ -0,0 +1,61 @@ +package command + +import ( + "context" + "sync" + + "github.com/sirupsen/logrus" +) + +type requestStatsKey struct{} + +type Stats struct { + registry map[string]int + sync.Mutex +} + +func (stats *Stats) RecordSum(key string, value int) { + stats.Lock() + defer stats.Unlock() + + if prevValue, ok := stats.registry[key]; ok { + value += prevValue + } + + stats.registry[key] = value +} + +func (stats *Stats) RecordMax(key string, value int) { + stats.Lock() + defer stats.Unlock() + + if prevValue, ok := stats.registry[key]; ok { + if prevValue > value { + return + } + } + + stats.registry[key] = value +} + +func (stats *Stats) Fields() logrus.Fields { + stats.Lock() + defer stats.Unlock() + + f := logrus.Fields{} + for k, v := range stats.registry { + f[k] = v + } + return f +} + +func StatsFromContext(ctx context.Context) *Stats { + stats, _ := ctx.Value(requestStatsKey{}).(*Stats) + return stats +} + +func InitContextStats(ctx context.Context) context.Context { + return context.WithValue(ctx, requestStatsKey{}, &Stats{ + registry: make(map[string]int), + }) +} diff --git a/internal/command/stats_test.go b/internal/command/stats_test.go new file mode 100644 index 000000000..98af29a51 --- /dev/null +++ b/internal/command/stats_test.go @@ -0,0 +1,77 @@ +package command + +import ( + "context" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +func TestStatsFromContext_BackgroundContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stats := StatsFromContext(ctx) + require.Nil(t, stats) +} + +func TestStatsFromContext_InitContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = InitContextStats(ctx) + + stats := StatsFromContext(ctx) + + require.NotNil(t, stats) + require.Equal(t, stats.Fields(), logrus.Fields{}) +} + +func TestStatsFromContext_RecordSum(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = InitContextStats(ctx) + + stats := StatsFromContext(ctx) + + stats.RecordSum("foo", 1) + stats.RecordSum("foo", 1) + + require.NotNil(t, stats) + require.Equal(t, stats.Fields(), logrus.Fields{"foo": 2}) +} + +func TestStatsFromContext_RecordSumByRef(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = InitContextStats(ctx) + + stats := StatsFromContext(ctx) + + stats.RecordSum("foo", 1) + stats.RecordSum("foo", 1) + + stats2 := StatsFromContext(ctx) + + require.NotNil(t, stats2) + require.Equal(t, stats2.Fields(), logrus.Fields{"foo": 2}) +} + +func TestStatsFromContext_RecordMax(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = InitContextStats(ctx) + + stats := StatsFromContext(ctx) + + stats.RecordMax("foo", 1024) + stats.RecordMax("foo", 256) + stats.RecordMax("foo", 512) + + require.NotNil(t, stats) + require.Equal(t, stats.Fields(), logrus.Fields{"foo": 1024}) +} diff --git a/internal/metadata/featureflag/feature_flags.go b/internal/metadata/featureflag/feature_flags.go index 4243aa90c..dff75b610 100644 --- a/internal/metadata/featureflag/feature_flags.go +++ b/internal/metadata/featureflag/feature_flags.go @@ -33,6 +33,8 @@ var ( // ReferenceTransactionHook will enable the reference-transaction hook // introduced with Git v2.28.0 for voting on transactions ReferenceTransactionHook = FeatureFlag{Name: "reference_transaction_hook", OnByDefault: false} + // LogCommandStats will log additional rusage stats for commands + LogCommandStats = FeatureFlag{Name: "log_command_stats", OnByDefault: false} ) // All includes all feature flags. diff --git a/internal/middleware/commandstatshandler/commandstatshandler.go b/internal/middleware/commandstatshandler/commandstatshandler.go new file mode 100644 index 000000000..8c3330f38 --- /dev/null +++ b/internal/middleware/commandstatshandler/commandstatshandler.go @@ -0,0 +1,38 @@ +package commandstatshandler + +import ( + "context" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "gitlab.com/gitlab-org/gitaly/internal/command" + "google.golang.org/grpc" +) + +// UnaryInterceptor returns a Unary Interceptor +func UnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + ctx = command.InitContextStats(ctx) + + res, err := handler(ctx, req) + + stats := command.StatsFromContext(ctx) + ctxlogrus.AddFields(ctx, stats.Fields()) + + return res, err +} + +// StreamInterceptor returns a Stream Interceptor +func StreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := stream.Context() + ctx = command.InitContextStats(ctx) + + wrapped := grpc_middleware.WrapServerStream(stream) + wrapped.WrappedContext = ctx + + err := handler(srv, wrapped) + + stats := command.StatsFromContext(ctx) + ctxlogrus.AddFields(ctx, stats.Fields()) + + return err +} diff --git a/internal/middleware/commandstatshandler/commandstatshandler_test.go b/internal/middleware/commandstatshandler/commandstatshandler_test.go new file mode 100644 index 000000000..f80dbe1e3 --- /dev/null +++ b/internal/middleware/commandstatshandler/commandstatshandler_test.go @@ -0,0 +1,121 @@ +package commandstatshandler + +import ( + "bytes" + "context" + "io" + "net" + "testing" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/internal/service/ref" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" +) + +func createNewServer(t *testing.T) *grpc.Server { + logger := testhelper.NewTestLogger(t) + logrusEntry := logrus.NewEntry(logger).WithField("test", t.Name()) + + opts := []grpc.ServerOption{ + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + grpc_logrus.StreamServerInterceptor(logrusEntry), + StreamInterceptor, + )), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + grpc_logrus.UnaryServerInterceptor(logrusEntry), + UnaryInterceptor, + )), + } + + server := grpc.NewServer(opts...) + + gitalypb.RegisterRefServiceServer(server, ref.NewServer()) + + return server +} + +func getBufDialer(listener *bufconn.Listener) func(context.Context, string) (net.Conn, error) { + return func(ctx context.Context, url string) (net.Conn, error) { + return listener.Dial() + } +} + +func TestInterceptor(t *testing.T) { + testhelper.Configure() + + logBuffer := &bytes.Buffer{} + testhelper.NewTestLogger = func(tb testing.TB) *logrus.Logger { + return &logrus.Logger{Out: logBuffer, Formatter: &logrus.JSONFormatter{}, Level: logrus.InfoLevel} + } + + s := createNewServer(t) + defer s.Stop() + + bufferSize := 1024 * 1024 + listener := bufconn.Listen(bufferSize) + go func() { + err := s.Serve(listener) + require.NoError(t, err) + }() + + tests := []struct { + name string + performRPC func(ctx context.Context, client gitalypb.RefServiceClient) + expectedLog string + }{ + { + name: "Unary", + performRPC: func(ctx context.Context, client gitalypb.RefServiceClient) { + repo := testhelper.TestRepository() + req := &gitalypb.RefExistsRequest{Repository: repo, Ref: []byte("refs/foo")} + + _, err := client.RefExists(ctx, req) + require.NoError(t, err) + }, + expectedLog: "\"command.count\":1", + }, + { + name: "Stream", + performRPC: func(ctx context.Context, client gitalypb.RefServiceClient) { + repo := testhelper.TestRepository() + req := &gitalypb.FindAllBranchNamesRequest{Repository: repo} + + stream, err := client.FindAllBranchNames(ctx, req) + require.NoError(t, err) + + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(t, err) + } + }, + expectedLog: "\"command.count\":1", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logBuffer.Reset() + + ctx := context.TODO() + ctx = featureflag.OutgoingCtxWithFeatureFlags(ctx, featureflag.LogCommandStats) + + conn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(getBufDialer(listener)), grpc.WithInsecure()) + require.NoError(t, err) + defer conn.Close() + + client := gitalypb.NewRefServiceClient(conn) + + tt.performRPC(ctx, client) + require.Contains(t, logBuffer.String(), tt.expectedLog) + }) + } +} diff --git a/internal/server/server.go b/internal/server/server.go index 243b387c6..6024bfe1d 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/logsanitizer" "gitlab.com/gitlab-org/gitaly/internal/middleware/cache" "gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler" + "gitlab.com/gitlab-org/gitaly/internal/middleware/commandstatshandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" @@ -88,6 +89,7 @@ func createNewServer(rubyServer *rubyserver.Server, gitlabAPI hook.GitlabAPI, cf metadatahandler.StreamInterceptor, grpc_prometheus.StreamServerInterceptor, grpc_logrus.StreamServerInterceptor(logrusEntry), + commandstatshandler.StreamInterceptor, sentryhandler.StreamLogHandler, cancelhandler.Stream, // Should be below LogHandler auth.StreamServerInterceptor(cfg.Auth), @@ -104,6 +106,7 @@ func createNewServer(rubyServer *rubyserver.Server, gitlabAPI hook.GitlabAPI, cf metadatahandler.UnaryInterceptor, grpc_prometheus.UnaryServerInterceptor, grpc_logrus.UnaryServerInterceptor(logrusEntry), + commandstatshandler.UnaryInterceptor, sentryhandler.UnaryLogHandler, cancelhandler.Unary, // Should be below LogHandler auth.UnaryServerInterceptor(cfg.Auth), |