Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2020-01-14 19:05:55 +0300
committerJacob Vosmaer <jacob@gitlab.com>2020-01-14 19:05:55 +0300
commit060f2f6ec6bee2c9b405d31dd49e3ecdd16f4697 (patch)
treeab02dc95a2f549bc90c92f17eafd4292a57d0c22
parent130308e43830fe42e7afc4d7bcea0f8af4c6ba5b (diff)
parent0c1d848739b72cf82e1dc62a497ba24f5fdc3225 (diff)
Merge branch 'ps-log-pack-stat-on-clone' into 'master'
Log statistics of pack re-use on clone and fetch Closes #2169 See merge request gitlab-org/gitaly!1743
-rw-r--r--changelogs/unreleased/ps-log-pack-stat-on-clone.yml5
-rw-r--r--internal/service/inspect/inspector.go68
-rw-r--r--internal/service/inspect/inspector_test.go112
-rw-r--r--internal/service/smarthttp/upload_pack.go8
-rw-r--r--internal/service/ssh/upload_pack.go9
5 files changed, 200 insertions, 2 deletions
diff --git a/changelogs/unreleased/ps-log-pack-stat-on-clone.yml b/changelogs/unreleased/ps-log-pack-stat-on-clone.yml
new file mode 100644
index 000000000..9c3f95f79
--- /dev/null
+++ b/changelogs/unreleased/ps-log-pack-stat-on-clone.yml
@@ -0,0 +1,5 @@
+---
+title: Log statistics of pack re-use on clone and fetch
+merge_request: 1743
+author:
+type: other
diff --git a/internal/service/inspect/inspector.go b/internal/service/inspect/inspector.go
new file mode 100644
index 000000000..fcd8a1aa7
--- /dev/null
+++ b/internal/service/inspect/inspector.go
@@ -0,0 +1,68 @@
+package inspect
+
+import (
+ "bytes"
+ "context"
+ "io"
+ "io/ioutil"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "gitlab.com/gitlab-org/gitaly/internal/git/pktline"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/text"
+)
+
+// NewWriter returns Writer that will feed 'action' with data on each write to it.
+// It is required to call Close once all data is processed.
+// Close will be blocked until action is completed.
+func NewWriter(writer io.Writer, action func(reader io.Reader)) io.WriteCloser {
+ pr, pw := io.Pipe()
+
+ multiOut := io.MultiWriter(writer, pw)
+ c := closer{c: pw, done: make(chan struct{})}
+
+ go func() {
+ defer close(c.done) // channel close signals that action is completed
+
+ action(pr)
+
+ _, _ = io.Copy(ioutil.Discard, pr) // consume all data to unblock multiOut
+ }()
+
+ return struct {
+ io.Writer
+ io.Closer
+ }{
+ Writer: multiOut,
+ Closer: c, // pw must be closed otherwise goroutine serving 'action' won't be terminated
+ }
+}
+
+type closer struct {
+ c io.Closer
+ done chan struct{}
+}
+
+// Close closes wrapped Closer and waits for done to be closed.
+func (c closer) Close() error {
+ defer func() { <-c.done }()
+ return c.c.Close()
+}
+
+// LogPackInfoStatistic inspect data stream for the informational messages
+// and logs info about pack file usage.
+func LogPackInfoStatistic(ctx context.Context) func(reader io.Reader) {
+ return func(reader io.Reader) {
+ logger := ctxlogrus.Extract(ctx)
+
+ scanner := pktline.NewScanner(reader)
+ for scanner.Scan() {
+ pktData := pktline.Data(scanner.Bytes())
+ if !bytes.HasPrefix(pktData, []byte("\x02Total ")) {
+ continue
+ }
+
+ logger.WithField("pack.stat", text.ChompBytes(pktData[1:])).Info("pack file compression statistic")
+ }
+ // we are not interested in scanner.Err()
+ }
+}
diff --git a/internal/service/inspect/inspector_test.go b/internal/service/inspect/inspector_test.go
new file mode 100644
index 000000000..751ace6e2
--- /dev/null
+++ b/internal/service/inspect/inspector_test.go
@@ -0,0 +1,112 @@
+package inspect
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "io"
+ "io/ioutil"
+ "strings"
+ "sync/atomic"
+ "testing"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/require"
+)
+
+func TestWrite(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ action func(io.Reader)
+ src io.Reader
+ exp []byte
+ expErrStr string
+ }{
+ {
+ desc: "all data consumed without errors",
+ action: func(reader io.Reader) {
+ data, err := ioutil.ReadAll(reader)
+ require.NoError(t, err)
+ require.Equal(t, []byte("somedata"), data)
+ },
+ src: strings.NewReader("somedata"),
+ exp: []byte("somedata"),
+ },
+ {
+ desc: "no data is ok",
+ action: func(reader io.Reader) {
+ data, err := ioutil.ReadAll(reader)
+ require.NoError(t, err)
+ require.Empty(t, data)
+ },
+ src: bytes.NewReader(nil),
+ exp: []byte{},
+ },
+ {
+ desc: "consumed by action partially",
+ action: func(reader io.Reader) {
+ b := make([]byte, 4)
+ n, err := reader.Read(b)
+ require.NoError(t, err)
+ require.Equal(t, 4, n)
+ require.Equal(t, []byte("some"), b)
+ },
+ src: strings.NewReader("somedata"),
+ exp: []byte("somedata"),
+ },
+ {
+ desc: "error on read",
+ action: func(reader io.Reader) {},
+ src: errReader("bad read"),
+ exp: []byte{},
+ expErrStr: "bad read",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ mainWriter := &bytes.Buffer{}
+ var checked int32
+
+ writer := NewWriter(mainWriter, func(reader io.Reader) {
+ tc.action(reader)
+ atomic.StoreInt32(&checked, 1)
+ })
+
+ _, err := io.Copy(writer, tc.src)
+ if tc.expErrStr != "" {
+ require.EqualError(t, err, tc.expErrStr)
+ } else {
+ require.NoError(t, err)
+ }
+
+ data, err := ioutil.ReadAll(mainWriter)
+ require.NoError(t, err)
+ require.Equal(t, tc.exp, data)
+
+ require.NoError(t, writer.Close())
+ require.Equal(t, int32(1), atomic.LoadInt32(&checked))
+ })
+ }
+}
+
+type errReader string
+
+func (e errReader) Read(p []byte) (n int, err error) {
+ return 0, errors.New(string(e))
+}
+
+func TestLogPackInfoStatistic(t *testing.T) {
+ dest := &bytes.Buffer{}
+ log := &logrus.Logger{
+ Out: dest,
+ Formatter: new(logrus.JSONFormatter),
+ Level: logrus.InfoLevel,
+ }
+ ctx := ctxlogrus.ToContext(context.Background(), log.WithField("test", "logging"))
+
+ logging := LogPackInfoStatistic(ctx)
+ logging(strings.NewReader("0038\x41ACK 1e292f8fedd741b75372e19097c76d327140c312 ready\n0035\x02Total 1044 (delta 519), reused 1035 (delta 512)\n0038\x41ACK 1e292f8fedd741b75372e19097c76d327140c312 ready\n0000\x01"))
+
+ require.Contains(t, dest.String(), "Total 1044 (delta 519), reused 1035 (delta 512)")
+ require.NotContains(t, dest.String(), "ACK 1e292f8fedd741b75372e19097c76d327140c312")
+}
diff --git a/internal/service/smarthttp/upload_pack.go b/internal/service/smarthttp/upload_pack.go
index 4bb8dc327..6d20eb0cd 100644
--- a/internal/service/smarthttp/upload_pack.go
+++ b/internal/service/smarthttp/upload_pack.go
@@ -11,6 +11,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/internal/service/inspect"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/streamio"
"google.golang.org/grpc/codes"
@@ -60,11 +61,16 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS
var respBytes int64
- stdout := streamio.NewWriter(func(p []byte) error {
+ stdoutWriter := streamio.NewWriter(func(p []byte) error {
respBytes += int64(len(p))
return stream.Send(&gitalypb.PostUploadPackResponse{Data: p})
})
+ // TODO: it is first step of the https://gitlab.com/gitlab-org/gitaly/issues/1519
+ // needs to be removed after we get some statistics on this
+ stdout := inspect.NewWriter(stdoutWriter, inspect.LogPackInfoStatistic(ctx))
+ defer stdout.Close()
+
env := git.AddGitProtocolEnv(ctx, req, command.GitEnv)
repoPath, err := helper.GetRepoPath(req.Repository)
diff --git a/internal/service/ssh/upload_pack.go b/internal/service/ssh/upload_pack.go
index 3973a0955..b58b4edd1 100644
--- a/internal/service/ssh/upload_pack.go
+++ b/internal/service/ssh/upload_pack.go
@@ -8,6 +8,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/pktline"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/service/inspect"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/streamio"
)
@@ -37,9 +38,15 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
request, err := stream.Recv()
return request.GetStdin(), err
})
- stdout := streamio.NewWriter(func(p []byte) error {
+
+ stdoutWriter := streamio.NewWriter(func(p []byte) error {
return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p})
})
+ // TODO: it is first step of the https://gitlab.com/gitlab-org/gitaly/issues/1519
+ // needs to be removed after we get some statistics on this
+ stdout := inspect.NewWriter(stdoutWriter, inspect.LogPackInfoStatistic(ctx))
+ defer stdout.Close()
+
stderr := streamio.NewWriter(func(p []byte) error {
return stream.Send(&gitalypb.SSHUploadPackResponse{Stderr: p})
})