diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2020-01-14 19:05:55 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2020-01-14 19:05:55 +0300 |
commit | 060f2f6ec6bee2c9b405d31dd49e3ecdd16f4697 (patch) | |
tree | ab02dc95a2f549bc90c92f17eafd4292a57d0c22 | |
parent | 130308e43830fe42e7afc4d7bcea0f8af4c6ba5b (diff) | |
parent | 0c1d848739b72cf82e1dc62a497ba24f5fdc3225 (diff) |
Merge branch 'ps-log-pack-stat-on-clone' into 'master'
Log statistics of pack re-use on clone and fetch
Closes #2169
See merge request gitlab-org/gitaly!1743
-rw-r--r-- | changelogs/unreleased/ps-log-pack-stat-on-clone.yml | 5 | ||||
-rw-r--r-- | internal/service/inspect/inspector.go | 68 | ||||
-rw-r--r-- | internal/service/inspect/inspector_test.go | 112 | ||||
-rw-r--r-- | internal/service/smarthttp/upload_pack.go | 8 | ||||
-rw-r--r-- | internal/service/ssh/upload_pack.go | 9 |
5 files changed, 200 insertions, 2 deletions
diff --git a/changelogs/unreleased/ps-log-pack-stat-on-clone.yml b/changelogs/unreleased/ps-log-pack-stat-on-clone.yml new file mode 100644 index 000000000..9c3f95f79 --- /dev/null +++ b/changelogs/unreleased/ps-log-pack-stat-on-clone.yml @@ -0,0 +1,5 @@ +--- +title: Log statistics of pack re-use on clone and fetch +merge_request: 1743 +author: +type: other diff --git a/internal/service/inspect/inspector.go b/internal/service/inspect/inspector.go new file mode 100644 index 000000000..fcd8a1aa7 --- /dev/null +++ b/internal/service/inspect/inspector.go @@ -0,0 +1,68 @@ +package inspect + +import ( + "bytes" + "context" + "io" + "io/ioutil" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "gitlab.com/gitlab-org/gitaly/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/internal/helper/text" +) + +// NewWriter returns Writer that will feed 'action' with data on each write to it. +// It is required to call Close once all data is processed. +// Close will be blocked until action is completed. +func NewWriter(writer io.Writer, action func(reader io.Reader)) io.WriteCloser { + pr, pw := io.Pipe() + + multiOut := io.MultiWriter(writer, pw) + c := closer{c: pw, done: make(chan struct{})} + + go func() { + defer close(c.done) // channel close signals that action is completed + + action(pr) + + _, _ = io.Copy(ioutil.Discard, pr) // consume all data to unblock multiOut + }() + + return struct { + io.Writer + io.Closer + }{ + Writer: multiOut, + Closer: c, // pw must be closed otherwise goroutine serving 'action' won't be terminated + } +} + +type closer struct { + c io.Closer + done chan struct{} +} + +// Close closes wrapped Closer and waits for done to be closed. +func (c closer) Close() error { + defer func() { <-c.done }() + return c.c.Close() +} + +// LogPackInfoStatistic inspect data stream for the informational messages +// and logs info about pack file usage. +func LogPackInfoStatistic(ctx context.Context) func(reader io.Reader) { + return func(reader io.Reader) { + logger := ctxlogrus.Extract(ctx) + + scanner := pktline.NewScanner(reader) + for scanner.Scan() { + pktData := pktline.Data(scanner.Bytes()) + if !bytes.HasPrefix(pktData, []byte("\x02Total ")) { + continue + } + + logger.WithField("pack.stat", text.ChompBytes(pktData[1:])).Info("pack file compression statistic") + } + // we are not interested in scanner.Err() + } +} diff --git a/internal/service/inspect/inspector_test.go b/internal/service/inspect/inspector_test.go new file mode 100644 index 000000000..751ace6e2 --- /dev/null +++ b/internal/service/inspect/inspector_test.go @@ -0,0 +1,112 @@ +package inspect + +import ( + "bytes" + "context" + "errors" + "io" + "io/ioutil" + "strings" + "sync/atomic" + "testing" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +func TestWrite(t *testing.T) { + for _, tc := range []struct { + desc string + action func(io.Reader) + src io.Reader + exp []byte + expErrStr string + }{ + { + desc: "all data consumed without errors", + action: func(reader io.Reader) { + data, err := ioutil.ReadAll(reader) + require.NoError(t, err) + require.Equal(t, []byte("somedata"), data) + }, + src: strings.NewReader("somedata"), + exp: []byte("somedata"), + }, + { + desc: "no data is ok", + action: func(reader io.Reader) { + data, err := ioutil.ReadAll(reader) + require.NoError(t, err) + require.Empty(t, data) + }, + src: bytes.NewReader(nil), + exp: []byte{}, + }, + { + desc: "consumed by action partially", + action: func(reader io.Reader) { + b := make([]byte, 4) + n, err := reader.Read(b) + require.NoError(t, err) + require.Equal(t, 4, n) + require.Equal(t, []byte("some"), b) + }, + src: strings.NewReader("somedata"), + exp: []byte("somedata"), + }, + { + desc: "error on read", + action: func(reader io.Reader) {}, + src: errReader("bad read"), + exp: []byte{}, + expErrStr: "bad read", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + mainWriter := &bytes.Buffer{} + var checked int32 + + writer := NewWriter(mainWriter, func(reader io.Reader) { + tc.action(reader) + atomic.StoreInt32(&checked, 1) + }) + + _, err := io.Copy(writer, tc.src) + if tc.expErrStr != "" { + require.EqualError(t, err, tc.expErrStr) + } else { + require.NoError(t, err) + } + + data, err := ioutil.ReadAll(mainWriter) + require.NoError(t, err) + require.Equal(t, tc.exp, data) + + require.NoError(t, writer.Close()) + require.Equal(t, int32(1), atomic.LoadInt32(&checked)) + }) + } +} + +type errReader string + +func (e errReader) Read(p []byte) (n int, err error) { + return 0, errors.New(string(e)) +} + +func TestLogPackInfoStatistic(t *testing.T) { + dest := &bytes.Buffer{} + log := &logrus.Logger{ + Out: dest, + Formatter: new(logrus.JSONFormatter), + Level: logrus.InfoLevel, + } + ctx := ctxlogrus.ToContext(context.Background(), log.WithField("test", "logging")) + + logging := LogPackInfoStatistic(ctx) + logging(strings.NewReader("0038\x41ACK 1e292f8fedd741b75372e19097c76d327140c312 ready\n0035\x02Total 1044 (delta 519), reused 1035 (delta 512)\n0038\x41ACK 1e292f8fedd741b75372e19097c76d327140c312 ready\n0000\x01")) + + require.Contains(t, dest.String(), "Total 1044 (delta 519), reused 1035 (delta 512)") + require.NotContains(t, dest.String(), "ACK 1e292f8fedd741b75372e19097c76d327140c312") +} diff --git a/internal/service/smarthttp/upload_pack.go b/internal/service/smarthttp/upload_pack.go index 4bb8dc327..6d20eb0cd 100644 --- a/internal/service/smarthttp/upload_pack.go +++ b/internal/service/smarthttp/upload_pack.go @@ -11,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/internal/service/inspect" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/streamio" "google.golang.org/grpc/codes" @@ -60,11 +61,16 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS var respBytes int64 - stdout := streamio.NewWriter(func(p []byte) error { + stdoutWriter := streamio.NewWriter(func(p []byte) error { respBytes += int64(len(p)) return stream.Send(&gitalypb.PostUploadPackResponse{Data: p}) }) + // TODO: it is first step of the https://gitlab.com/gitlab-org/gitaly/issues/1519 + // needs to be removed after we get some statistics on this + stdout := inspect.NewWriter(stdoutWriter, inspect.LogPackInfoStatistic(ctx)) + defer stdout.Close() + env := git.AddGitProtocolEnv(ctx, req, command.GitEnv) repoPath, err := helper.GetRepoPath(req.Repository) diff --git a/internal/service/ssh/upload_pack.go b/internal/service/ssh/upload_pack.go index 3973a0955..b58b4edd1 100644 --- a/internal/service/ssh/upload_pack.go +++ b/internal/service/ssh/upload_pack.go @@ -8,6 +8,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/service/inspect" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/streamio" ) @@ -37,9 +38,15 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r request, err := stream.Recv() return request.GetStdin(), err }) - stdout := streamio.NewWriter(func(p []byte) error { + + stdoutWriter := streamio.NewWriter(func(p []byte) error { return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p}) }) + // TODO: it is first step of the https://gitlab.com/gitlab-org/gitaly/issues/1519 + // needs to be removed after we get some statistics on this + stdout := inspect.NewWriter(stdoutWriter, inspect.LogPackInfoStatistic(ctx)) + defer stdout.Close() + stderr := streamio.NewWriter(func(p []byte) error { return stream.Send(&gitalypb.SSHUploadPackResponse{Stderr: p}) }) |