Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-22 13:19:43 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-30 13:03:46 +0300
commitf1f4e68c5c3370bafe48f496dc2e6be5aa846ccd (patch)
tree64aa983171fdf4e64c678af84e1c43087f0f2ef1
parent75ccb2ef11efbf803c8b4b9fb457dfb5ea78eb32 (diff)
stats: Implement support for parsing git-send-pack(1) responses
While we currently only support the reference discovery phase of a push, a subsequent commit will also implement support for sending the packfile and commands to update remote references. In order to extract metrics from responses, this commit introduces logic to parse data returned by git-send-pack(1).
-rw-r--r--internal/git/stats/send_pack.go108
-rw-r--r--internal/git/stats/send_pack_test.go100
2 files changed, 208 insertions, 0 deletions
diff --git a/internal/git/stats/send_pack.go b/internal/git/stats/send_pack.go
new file mode 100644
index 000000000..53271377f
--- /dev/null
+++ b/internal/git/stats/send_pack.go
@@ -0,0 +1,108 @@
+package stats
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
+)
+
+// SendPack is used to parse the response body of a git-send-pack(1) request.
+type SendPack struct {
+ // ReportProgress is an optional callback set by the caller. If set, this function
+ // will be called for all received progress packets.
+ ReportProgress func([]byte)
+
+ updatedRefs int
+ packets int
+ largestPacketSize int
+ unpackOK time.Time
+ responseBody time.Time
+ multiband map[string]*bandInfo
+}
+
+// Parse parses the server response from a git-send-pack(1) request. The expected response is
+// documented in git.git:Documentation/techincal/pack-protocol.txt. We expect that the report-status
+// capability is active and thus don't support the report-status-v2 protocol.
+func (s *SendPack) Parse(body io.Reader) error {
+ s.multiband = make(map[string]*bandInfo)
+ for _, band := range Bands() {
+ s.multiband[band] = &bandInfo{}
+ }
+
+ scanner := pktline.NewScanner(body)
+ seenFlush := false
+ seenPack := false
+
+ // We're requesting the side-band-64k capability and thus receive a multiplexed stream from
+ // git-receive-pack(1). The first byte of each pktline will identify the multiplexing band.
+ for ; scanner.Scan(); s.packets++ {
+ if seenFlush {
+ return errors.New("received extra packet after flush")
+ }
+ if pktline.IsFlush(scanner.Bytes()) {
+ seenFlush = true
+ continue
+ }
+
+ data := pktline.Data(scanner.Bytes())
+ if len(data) == 0 {
+ return errors.New("empty packet in PACK data")
+ }
+ if len(data) > s.largestPacketSize {
+ s.largestPacketSize = len(data)
+ }
+
+ band, err := bandToHuman(data[0])
+ if err != nil {
+ return fmt.Errorf("converting band: %w", err)
+ }
+
+ s.multiband[band].consume(data)
+ data = data[1:]
+
+ switch band {
+ case bandPack:
+ // The pack band contains our "normal" pktlines which are nested into the
+ // outer pktline.
+ scanner := pktline.NewScanner(bytes.NewReader(data))
+ for scanner.Scan() {
+ if pktline.IsFlush(scanner.Bytes()) {
+ break
+ }
+
+ data := pktline.Data(scanner.Bytes())
+ if !seenPack {
+ if !bytes.Equal(data, []byte("unpack ok\n")) {
+ return fmt.Errorf("expected unpack ok, got %q", data)
+ }
+ s.unpackOK = time.Now()
+ seenPack = true
+ } else {
+ if bytes.HasPrefix(data, []byte("ok ")) {
+ s.updatedRefs++
+ } else if bytes.HasPrefix(data, []byte("ng ")) {
+ return fmt.Errorf("reference update failed: %q", data)
+ } else {
+ return fmt.Errorf("unsupported packet line: %q", data)
+ }
+ }
+ }
+ case bandProgress:
+ if s.ReportProgress != nil {
+ s.ReportProgress(data)
+ }
+ case bandError:
+ return fmt.Errorf("received error: %q", data)
+ default:
+ return fmt.Errorf("unsupported band: %q", band)
+ }
+ }
+
+ s.responseBody = time.Now()
+
+ return nil
+}
diff --git a/internal/git/stats/send_pack_test.go b/internal/git/stats/send_pack_test.go
new file mode 100644
index 000000000..6dd126e85
--- /dev/null
+++ b/internal/git/stats/send_pack_test.go
@@ -0,0 +1,100 @@
+package stats
+
+import (
+ "bytes"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+)
+
+func TestSendPack_Parse(t *testing.T) {
+ t.Run("valid input", func(t *testing.T) {
+ var progress []byte
+ sendPack := &SendPack{
+ ReportProgress: func(p []byte) {
+ progress = append(progress, p...)
+ },
+ }
+
+ var response bytes.Buffer
+
+ startTime := time.Now()
+
+ gittest.WritePktlineString(t, &response, "\x01000eunpack ok\n0015ok refs/heads/branch\n0000")
+ gittest.WritePktlineString(t, &response, "\x010013ok refs/heads/other0000\n")
+ gittest.WritePktlineString(t, &response, "\x02progress-bytes")
+ gittest.WritePktlineFlush(t, &response)
+
+ err := sendPack.Parse(&response)
+ require.NoError(t, err)
+
+ endTime := time.Now()
+
+ require.Equal(t, 4, sendPack.packets)
+ require.Equal(t, 44, sendPack.largestPacketSize)
+
+ for _, band := range []string{"pack", "progress"} {
+ require.True(t, startTime.Before(sendPack.multiband[band].firstPacket))
+ require.True(t, endTime.After(sendPack.multiband[band].firstPacket))
+ sendPack.multiband[band].firstPacket = startTime
+ }
+
+ require.Equal(t, &bandInfo{
+ firstPacket: startTime,
+ size: 73,
+ packets: 2,
+ }, sendPack.multiband["pack"])
+
+ require.Equal(t, &bandInfo{
+ firstPacket: startTime,
+ size: 15,
+ packets: 1,
+ }, sendPack.multiband["progress"])
+
+ require.Equal(t, &bandInfo{}, sendPack.multiband["error"])
+
+ require.Equal(t, "progress-bytes", string(progress))
+ })
+
+ t.Run("data after flush", func(t *testing.T) {
+ var response bytes.Buffer
+ gittest.WritePktlineString(t, &response, "\x01000eunpack ok\n0000")
+ gittest.WritePktlineFlush(t, &response)
+ gittest.WritePktlineString(t, &response, "\x01somethingsomething")
+
+ err := (&SendPack{}).Parse(&response)
+ require.Equal(t, errors.New("received extra packet after flush"), err)
+ })
+
+ t.Run("unpack error", func(t *testing.T) {
+ var response bytes.Buffer
+ gittest.WritePktlineString(t, &response, "\x010011unpack error\n0000")
+
+ err := (&SendPack{}).Parse(&response)
+ require.Equal(t, errors.New("expected unpack ok, got \"unpack error\\n\""), err)
+ })
+
+ t.Run("error sideband", func(t *testing.T) {
+ var response bytes.Buffer
+ gittest.WritePktlineString(t, &response, "\x01000eunpack ok\n0000")
+ gittest.WritePktlineString(t, &response, "\x03error-bytes")
+ gittest.WritePktlineFlush(t, &response)
+
+ err := (&SendPack{}).Parse(&response)
+ require.Equal(t, errors.New("received error: \"error-bytes\""), err)
+ })
+
+ t.Run("failed reference update", func(t *testing.T) {
+ var response bytes.Buffer
+ gittest.WritePktlineString(t, &response, "\x01000eunpack ok\n0000")
+ gittest.WritePktlineString(t, &response, "\x010014ok refs/heads/branch0000")
+ gittest.WritePktlineString(t, &response, "\x010021ng refs/heads/feature failure0000")
+ gittest.WritePktlineFlush(t, &response)
+
+ err := (&SendPack{}).Parse(&response)
+ require.Equal(t, errors.New("reference update failed: \"ng refs/heads/feature failure\""), err)
+ })
+}