diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2021-08-10 14:12:23 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2021-08-10 17:42:38 +0300 |
commit | 84a84e95ab791803b9ea3f57bb48982f4359abcc (patch) | |
tree | f6e11b48b0717a9034ffc9cc0e3dfad7e3458f37 | |
parent | 8fe0f38a98ddcc5e2cb68f7593e1514c6a1287b1 (diff) |
Move pack.stat logging to PackObjectsHook
This extracts the message we want to log closer to the source, and
lets us avoid parsing the response stream of {Post,SSH}UploadPack
which is a hot code path.
Changelog: other
-rw-r--r-- | internal/gitaly/service/hook/pack_objects.go | 21 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 15 | ||||
-rw-r--r-- | internal/gitaly/service/inspect/inspector.go | 68 | ||||
-rw-r--r-- | internal/gitaly/service/inspect/inspector_test.go | 116 | ||||
-rw-r--r-- | internal/gitaly/service/smarthttp/upload_pack.go | 8 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_pack.go | 7 |
6 files changed, 37 insertions, 198 deletions
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 3176d5fa7..7d6cedb05 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -189,10 +189,15 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb defer func() { generatedBytes := stdout.N + stderr.N packObjectsGeneratedBytes.Add(float64(generatedBytes)) - ctxlogrus.Extract(ctx).WithFields(logrus.Fields{ + logger := ctxlogrus.Extract(ctx) + logger.WithFields(logrus.Fields{ "cache_key": key, "bytes": generatedBytes, }).Info("generated bytes") + + if total := totalMessage(stderrBuf.Bytes()); total != "" { + logger.WithField("pack.stat", total).Info("pack file compression statistic") + } }() cmd, err := s.gitCmdFactory.New(ctx, repo, args.subcmd(), @@ -212,6 +217,20 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb return nil } +func totalMessage(stderr []byte) string { + start := bytes.Index(stderr, []byte("Total ")) + if start < 0 { + return "" + } + + end := bytes.Index(stderr[start:], []byte("\n")) + if end < 0 { + return "" + } + + return string(stderr[start : start+end]) +} + var ( errNoPackObjects = errors.New("missing pack-objects") errNonFlagArg = errors.New("non-flag argument") diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index b6d3c9879..a10f1466e 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "strings" "testing" "time" @@ -130,6 +131,20 @@ func TestServer_PackObjectsHook(t *testing.T) { require.Greater(t, entry.Data["bytes"], int64(0)) }) } + + t.Run("pack file compression statistic", func(t *testing.T) { + var entry *logrus.Entry + for _, e := range hook.AllEntries() { + if e.Message == "pack file compression statistic" { + entry = e + } + } + + require.NotNil(t, entry) + total := entry.Data["pack.stat"].(string) + require.True(t, strings.HasPrefix(total, "Total ")) + require.False(t, strings.Contains(total, "\n")) + }) }) } } diff --git a/internal/gitaly/service/inspect/inspector.go b/internal/gitaly/service/inspect/inspector.go deleted file mode 100644 index ee625cb4f..000000000 --- a/internal/gitaly/service/inspect/inspector.go +++ /dev/null @@ -1,68 +0,0 @@ -package inspect - -import ( - "bytes" - "context" - "io" - "io/ioutil" - - "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" - "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" -) - -// NewWriter returns Writer that will feed 'action' with data on each write to it. -// It is required to call Close once all data is processed. -// Close will be blocked until action is completed. -func NewWriter(writer io.Writer, action func(reader io.Reader)) io.WriteCloser { - pr, pw := io.Pipe() - - multiOut := io.MultiWriter(writer, pw) - c := closer{c: pw, done: make(chan struct{})} - - go func() { - defer close(c.done) // channel close signals that action is completed - - action(pr) - - _, _ = io.Copy(ioutil.Discard, pr) // consume all data to unblock multiOut - }() - - return struct { - io.Writer - io.Closer - }{ - Writer: multiOut, - Closer: c, // pw must be closed otherwise goroutine serving 'action' won't be terminated - } -} - -type closer struct { - c io.Closer - done chan struct{} -} - -// Close closes wrapped Closer and waits for done to be closed. -func (c closer) Close() error { - defer func() { <-c.done }() - return c.c.Close() -} - -// LogPackInfoStatistic inspect data stream for the informational messages -// and logs info about pack file usage. -func LogPackInfoStatistic(ctx context.Context) func(reader io.Reader) { - return func(reader io.Reader) { - logger := ctxlogrus.Extract(ctx) - - scanner := pktline.NewScanner(reader) - for scanner.Scan() { - pktData := pktline.Data(scanner.Bytes()) - if !bytes.HasPrefix(pktData, []byte("\x02Total ")) { - continue - } - - logger.WithField("pack.stat", text.ChompBytes(pktData[1:])).Info("pack file compression statistic") - } - // we are not interested in scanner.Err() - } -} diff --git a/internal/gitaly/service/inspect/inspector_test.go b/internal/gitaly/service/inspect/inspector_test.go deleted file mode 100644 index cc9b825a4..000000000 --- a/internal/gitaly/service/inspect/inspector_test.go +++ /dev/null @@ -1,116 +0,0 @@ -package inspect - -import ( - "bytes" - "errors" - "io" - "io/ioutil" - "strings" - "sync/atomic" - "testing" - - "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" -) - -func TestWrite(t *testing.T) { - for _, tc := range []struct { - desc string - action func(io.Reader) - src io.Reader - exp []byte - expErrStr string - }{ - { - desc: "all data consumed without errors", - action: func(reader io.Reader) { - data, err := ioutil.ReadAll(reader) - require.NoError(t, err) - require.Equal(t, []byte("somedata"), data) - }, - src: strings.NewReader("somedata"), - exp: []byte("somedata"), - }, - { - desc: "no data is ok", - action: func(reader io.Reader) { - data, err := ioutil.ReadAll(reader) - require.NoError(t, err) - require.Empty(t, data) - }, - src: bytes.NewReader(nil), - exp: []byte{}, - }, - { - desc: "consumed by action partially", - action: func(reader io.Reader) { - b := make([]byte, 4) - n, err := reader.Read(b) - require.NoError(t, err) - require.Equal(t, 4, n) - require.Equal(t, []byte("some"), b) - }, - src: strings.NewReader("somedata"), - exp: []byte("somedata"), - }, - { - desc: "error on read", - action: func(reader io.Reader) {}, - src: errReader("bad read"), - exp: []byte{}, - expErrStr: "bad read", - }, - } { - t.Run(tc.desc, func(t *testing.T) { - mainWriter := &bytes.Buffer{} - var checked int32 - - writer := NewWriter(mainWriter, func(reader io.Reader) { - tc.action(reader) - atomic.StoreInt32(&checked, 1) - }) - - _, err := io.Copy(writer, tc.src) - if tc.expErrStr != "" { - require.EqualError(t, err, tc.expErrStr) - } else { - require.NoError(t, err) - } - - data, err := ioutil.ReadAll(mainWriter) - require.NoError(t, err) - require.Equal(t, tc.exp, data) - - require.NoError(t, writer.Close()) - require.Equal(t, int32(1), atomic.LoadInt32(&checked)) - }) - } -} - -type errReader string - -func (e errReader) Read(p []byte) (n int, err error) { - return 0, errors.New(string(e)) -} - -func TestLogPackInfoStatistic(t *testing.T) { - dest := &bytes.Buffer{} - log := &logrus.Logger{ - Out: dest, - Formatter: &logrus.JSONFormatter{}, - Level: logrus.InfoLevel, - } - - ctx, cancel := testhelper.Context() - defer cancel() - - ctx = ctxlogrus.ToContext(ctx, log.WithField("test", "logging")) - - logging := LogPackInfoStatistic(ctx) - logging(strings.NewReader("0038\x41ACK 1e292f8fedd741b75372e19097c76d327140c312 ready\n0035\x02Total 1044 (delta 519), reused 1035 (delta 512)\n0038\x41ACK 1e292f8fedd741b75372e19097c76d327140c312 ready\n0000\x01")) - - require.Contains(t, dest.String(), "Total 1044 (delta 519), reused 1035 (delta 512)") - require.NotContains(t, dest.String(), "ACK 1e292f8fedd741b75372e19097c76d327140c312") -} diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go index a4edc778c..f6744f1d2 100644 --- a/internal/gitaly/service/smarthttp/upload_pack.go +++ b/internal/gitaly/service/smarthttp/upload_pack.go @@ -10,7 +10,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/command" "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/stats" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/inspect" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" "google.golang.org/grpc/codes" @@ -42,16 +41,11 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS var respBytes int64 - stdoutWriter := streamio.NewWriter(func(p []byte) error { + stdout := streamio.NewWriter(func(p []byte) error { respBytes += int64(len(p)) return stream.Send(&gitalypb.PostUploadPackResponse{Data: p}) }) - // TODO: it is first step of the https://gitlab.com/gitlab-org/gitaly/issues/1519 - // needs to be removed after we get some statistics on this - stdout := inspect.NewWriter(stdoutWriter, inspect.LogPackInfoStatistic(ctx)) - defer stdout.Close() - repoPath, err := s.locator.GetRepoPath(req.Repository) if err != nil { return err diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go index 550aea167..4b7abb315 100644 --- a/internal/gitaly/service/ssh/upload_pack.go +++ b/internal/gitaly/service/ssh/upload_pack.go @@ -12,7 +12,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/git/stats" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/inspect" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" @@ -59,13 +58,9 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r // synchronize writing stdout and stderrr. var m sync.Mutex - stdoutWriter := streamio.NewSyncWriter(&m, func(p []byte) error { + stdout := streamio.NewSyncWriter(&m, func(p []byte) error { return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p}) }) - // TODO: it is first step of the https://gitlab.com/gitlab-org/gitaly/issues/1519 - // needs to be removed after we get some statistics on this - stdout := inspect.NewWriter(stdoutWriter, inspect.LogPackInfoStatistic(ctx)) - defer stdout.Close() stderr := streamio.NewSyncWriter(&m, func(p []byte) error { return stream.Send(&gitalypb.SSHUploadPackResponse{Stderr: p}) |