Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2021-08-10 14:12:23 +0300
committerJacob Vosmaer <jacob@gitlab.com>2021-08-10 17:42:38 +0300
commit84a84e95ab791803b9ea3f57bb48982f4359abcc (patch)
treef6e11b48b0717a9034ffc9cc0e3dfad7e3458f37
parent8fe0f38a98ddcc5e2cb68f7593e1514c6a1287b1 (diff)
Move pack.stat logging to PackObjectsHook
This extracts the message we want to log closer to the source, and lets us avoid parsing the response stream of {Post,SSH}UploadPack which is a hot code path. Changelog: other
-rw-r--r--internal/gitaly/service/hook/pack_objects.go21
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go15
-rw-r--r--internal/gitaly/service/inspect/inspector.go68
-rw-r--r--internal/gitaly/service/inspect/inspector_test.go116
-rw-r--r--internal/gitaly/service/smarthttp/upload_pack.go8
-rw-r--r--internal/gitaly/service/ssh/upload_pack.go7
6 files changed, 37 insertions, 198 deletions
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go
index 3176d5fa7..7d6cedb05 100644
--- a/internal/gitaly/service/hook/pack_objects.go
+++ b/internal/gitaly/service/hook/pack_objects.go
@@ -189,10 +189,15 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb
defer func() {
generatedBytes := stdout.N + stderr.N
packObjectsGeneratedBytes.Add(float64(generatedBytes))
- ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
+ logger := ctxlogrus.Extract(ctx)
+ logger.WithFields(logrus.Fields{
"cache_key": key,
"bytes": generatedBytes,
}).Info("generated bytes")
+
+ if total := totalMessage(stderrBuf.Bytes()); total != "" {
+ logger.WithField("pack.stat", total).Info("pack file compression statistic")
+ }
}()
cmd, err := s.gitCmdFactory.New(ctx, repo, args.subcmd(),
@@ -212,6 +217,20 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb
return nil
}
+func totalMessage(stderr []byte) string {
+ start := bytes.Index(stderr, []byte("Total "))
+ if start < 0 {
+ return ""
+ }
+
+ end := bytes.Index(stderr[start:], []byte("\n"))
+ if end < 0 {
+ return ""
+ }
+
+ return string(stderr[start : start+end])
+}
+
var (
errNoPackObjects = errors.New("missing pack-objects")
errNonFlagArg = errors.New("non-flag argument")
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index b6d3c9879..a10f1466e 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"io"
+ "strings"
"testing"
"time"
@@ -130,6 +131,20 @@ func TestServer_PackObjectsHook(t *testing.T) {
require.Greater(t, entry.Data["bytes"], int64(0))
})
}
+
+ t.Run("pack file compression statistic", func(t *testing.T) {
+ var entry *logrus.Entry
+ for _, e := range hook.AllEntries() {
+ if e.Message == "pack file compression statistic" {
+ entry = e
+ }
+ }
+
+ require.NotNil(t, entry)
+ total := entry.Data["pack.stat"].(string)
+ require.True(t, strings.HasPrefix(total, "Total "))
+ require.False(t, strings.Contains(total, "\n"))
+ })
})
}
}
diff --git a/internal/gitaly/service/inspect/inspector.go b/internal/gitaly/service/inspect/inspector.go
deleted file mode 100644
index ee625cb4f..000000000
--- a/internal/gitaly/service/inspect/inspector.go
+++ /dev/null
@@ -1,68 +0,0 @@
-package inspect
-
-import (
- "bytes"
- "context"
- "io"
- "io/ioutil"
-
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
- "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
-)
-
-// NewWriter returns Writer that will feed 'action' with data on each write to it.
-// It is required to call Close once all data is processed.
-// Close will be blocked until action is completed.
-func NewWriter(writer io.Writer, action func(reader io.Reader)) io.WriteCloser {
- pr, pw := io.Pipe()
-
- multiOut := io.MultiWriter(writer, pw)
- c := closer{c: pw, done: make(chan struct{})}
-
- go func() {
- defer close(c.done) // channel close signals that action is completed
-
- action(pr)
-
- _, _ = io.Copy(ioutil.Discard, pr) // consume all data to unblock multiOut
- }()
-
- return struct {
- io.Writer
- io.Closer
- }{
- Writer: multiOut,
- Closer: c, // pw must be closed otherwise goroutine serving 'action' won't be terminated
- }
-}
-
-type closer struct {
- c io.Closer
- done chan struct{}
-}
-
-// Close closes wrapped Closer and waits for done to be closed.
-func (c closer) Close() error {
- defer func() { <-c.done }()
- return c.c.Close()
-}
-
-// LogPackInfoStatistic inspect data stream for the informational messages
-// and logs info about pack file usage.
-func LogPackInfoStatistic(ctx context.Context) func(reader io.Reader) {
- return func(reader io.Reader) {
- logger := ctxlogrus.Extract(ctx)
-
- scanner := pktline.NewScanner(reader)
- for scanner.Scan() {
- pktData := pktline.Data(scanner.Bytes())
- if !bytes.HasPrefix(pktData, []byte("\x02Total ")) {
- continue
- }
-
- logger.WithField("pack.stat", text.ChompBytes(pktData[1:])).Info("pack file compression statistic")
- }
- // we are not interested in scanner.Err()
- }
-}
diff --git a/internal/gitaly/service/inspect/inspector_test.go b/internal/gitaly/service/inspect/inspector_test.go
deleted file mode 100644
index cc9b825a4..000000000
--- a/internal/gitaly/service/inspect/inspector_test.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package inspect
-
-import (
- "bytes"
- "errors"
- "io"
- "io/ioutil"
- "strings"
- "sync/atomic"
- "testing"
-
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
- "github.com/sirupsen/logrus"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
-)
-
-func TestWrite(t *testing.T) {
- for _, tc := range []struct {
- desc string
- action func(io.Reader)
- src io.Reader
- exp []byte
- expErrStr string
- }{
- {
- desc: "all data consumed without errors",
- action: func(reader io.Reader) {
- data, err := ioutil.ReadAll(reader)
- require.NoError(t, err)
- require.Equal(t, []byte("somedata"), data)
- },
- src: strings.NewReader("somedata"),
- exp: []byte("somedata"),
- },
- {
- desc: "no data is ok",
- action: func(reader io.Reader) {
- data, err := ioutil.ReadAll(reader)
- require.NoError(t, err)
- require.Empty(t, data)
- },
- src: bytes.NewReader(nil),
- exp: []byte{},
- },
- {
- desc: "consumed by action partially",
- action: func(reader io.Reader) {
- b := make([]byte, 4)
- n, err := reader.Read(b)
- require.NoError(t, err)
- require.Equal(t, 4, n)
- require.Equal(t, []byte("some"), b)
- },
- src: strings.NewReader("somedata"),
- exp: []byte("somedata"),
- },
- {
- desc: "error on read",
- action: func(reader io.Reader) {},
- src: errReader("bad read"),
- exp: []byte{},
- expErrStr: "bad read",
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- mainWriter := &bytes.Buffer{}
- var checked int32
-
- writer := NewWriter(mainWriter, func(reader io.Reader) {
- tc.action(reader)
- atomic.StoreInt32(&checked, 1)
- })
-
- _, err := io.Copy(writer, tc.src)
- if tc.expErrStr != "" {
- require.EqualError(t, err, tc.expErrStr)
- } else {
- require.NoError(t, err)
- }
-
- data, err := ioutil.ReadAll(mainWriter)
- require.NoError(t, err)
- require.Equal(t, tc.exp, data)
-
- require.NoError(t, writer.Close())
- require.Equal(t, int32(1), atomic.LoadInt32(&checked))
- })
- }
-}
-
-type errReader string
-
-func (e errReader) Read(p []byte) (n int, err error) {
- return 0, errors.New(string(e))
-}
-
-func TestLogPackInfoStatistic(t *testing.T) {
- dest := &bytes.Buffer{}
- log := &logrus.Logger{
- Out: dest,
- Formatter: &logrus.JSONFormatter{},
- Level: logrus.InfoLevel,
- }
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- ctx = ctxlogrus.ToContext(ctx, log.WithField("test", "logging"))
-
- logging := LogPackInfoStatistic(ctx)
- logging(strings.NewReader("0038\x41ACK 1e292f8fedd741b75372e19097c76d327140c312 ready\n0035\x02Total 1044 (delta 519), reused 1035 (delta 512)\n0038\x41ACK 1e292f8fedd741b75372e19097c76d327140c312 ready\n0000\x01"))
-
- require.Contains(t, dest.String(), "Total 1044 (delta 519), reused 1035 (delta 512)")
- require.NotContains(t, dest.String(), "ACK 1e292f8fedd741b75372e19097c76d327140c312")
-}
diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go
index a4edc778c..f6744f1d2 100644
--- a/internal/gitaly/service/smarthttp/upload_pack.go
+++ b/internal/gitaly/service/smarthttp/upload_pack.go
@@ -10,7 +10,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/stats"
- "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/inspect"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
"google.golang.org/grpc/codes"
@@ -42,16 +41,11 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS
var respBytes int64
- stdoutWriter := streamio.NewWriter(func(p []byte) error {
+ stdout := streamio.NewWriter(func(p []byte) error {
respBytes += int64(len(p))
return stream.Send(&gitalypb.PostUploadPackResponse{Data: p})
})
- // TODO: it is first step of the https://gitlab.com/gitlab-org/gitaly/issues/1519
- // needs to be removed after we get some statistics on this
- stdout := inspect.NewWriter(stdoutWriter, inspect.LogPackInfoStatistic(ctx))
- defer stdout.Close()
-
repoPath, err := s.locator.GetRepoPath(req.Repository)
if err != nil {
return err
diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go
index 550aea167..4b7abb315 100644
--- a/internal/gitaly/service/ssh/upload_pack.go
+++ b/internal/gitaly/service/ssh/upload_pack.go
@@ -12,7 +12,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/stats"
- "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/inspect"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
@@ -59,13 +58,9 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
// synchronize writing stdout and stderrr.
var m sync.Mutex
- stdoutWriter := streamio.NewSyncWriter(&m, func(p []byte) error {
+ stdout := streamio.NewSyncWriter(&m, func(p []byte) error {
return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p})
})
- // TODO: it is first step of the https://gitlab.com/gitlab-org/gitaly/issues/1519
- // needs to be removed after we get some statistics on this
- stdout := inspect.NewWriter(stdoutWriter, inspect.LogPackInfoStatistic(ctx))
- defer stdout.Close()
stderr := streamio.NewSyncWriter(&m, func(p []byte) error {
return stream.Send(&gitalypb.SSHUploadPackResponse{Stderr: p})