diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-22 13:19:43 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-30 13:03:46 +0300 |
commit | f1f4e68c5c3370bafe48f496dc2e6be5aa846ccd (patch) | |
tree | 64aa983171fdf4e64c678af84e1c43087f0f2ef1 | |
parent | 75ccb2ef11efbf803c8b4b9fb457dfb5ea78eb32 (diff) |
stats: Implement support for parsing git-send-pack(1) responses
While we currently only support the reference discovery phase of a push,
a subsequent commit will also implement support for sending the packfile
and commands to update remote references. In order to extract metrics
from responses, this commit introduces logic to parse data returned by
git-send-pack(1).
-rw-r--r-- | internal/git/stats/send_pack.go | 108 | ||||
-rw-r--r-- | internal/git/stats/send_pack_test.go | 100 |
2 files changed, 208 insertions, 0 deletions
diff --git a/internal/git/stats/send_pack.go b/internal/git/stats/send_pack.go new file mode 100644 index 000000000..53271377f --- /dev/null +++ b/internal/git/stats/send_pack.go @@ -0,0 +1,108 @@ +package stats + +import ( + "bytes" + "errors" + "fmt" + "io" + "time" + + "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" +) + +// SendPack is used to parse the response body of a git-send-pack(1) request. +type SendPack struct { + // ReportProgress is an optional callback set by the caller. If set, this function + // will be called for all received progress packets. + ReportProgress func([]byte) + + updatedRefs int + packets int + largestPacketSize int + unpackOK time.Time + responseBody time.Time + multiband map[string]*bandInfo +} + +// Parse parses the server response from a git-send-pack(1) request. The expected response is +// documented in git.git:Documentation/techincal/pack-protocol.txt. We expect that the report-status +// capability is active and thus don't support the report-status-v2 protocol. +func (s *SendPack) Parse(body io.Reader) error { + s.multiband = make(map[string]*bandInfo) + for _, band := range Bands() { + s.multiband[band] = &bandInfo{} + } + + scanner := pktline.NewScanner(body) + seenFlush := false + seenPack := false + + // We're requesting the side-band-64k capability and thus receive a multiplexed stream from + // git-receive-pack(1). The first byte of each pktline will identify the multiplexing band. + for ; scanner.Scan(); s.packets++ { + if seenFlush { + return errors.New("received extra packet after flush") + } + if pktline.IsFlush(scanner.Bytes()) { + seenFlush = true + continue + } + + data := pktline.Data(scanner.Bytes()) + if len(data) == 0 { + return errors.New("empty packet in PACK data") + } + if len(data) > s.largestPacketSize { + s.largestPacketSize = len(data) + } + + band, err := bandToHuman(data[0]) + if err != nil { + return fmt.Errorf("converting band: %w", err) + } + + s.multiband[band].consume(data) + data = data[1:] + + switch band { + case bandPack: + // The pack band contains our "normal" pktlines which are nested into the + // outer pktline. + scanner := pktline.NewScanner(bytes.NewReader(data)) + for scanner.Scan() { + if pktline.IsFlush(scanner.Bytes()) { + break + } + + data := pktline.Data(scanner.Bytes()) + if !seenPack { + if !bytes.Equal(data, []byte("unpack ok\n")) { + return fmt.Errorf("expected unpack ok, got %q", data) + } + s.unpackOK = time.Now() + seenPack = true + } else { + if bytes.HasPrefix(data, []byte("ok ")) { + s.updatedRefs++ + } else if bytes.HasPrefix(data, []byte("ng ")) { + return fmt.Errorf("reference update failed: %q", data) + } else { + return fmt.Errorf("unsupported packet line: %q", data) + } + } + } + case bandProgress: + if s.ReportProgress != nil { + s.ReportProgress(data) + } + case bandError: + return fmt.Errorf("received error: %q", data) + default: + return fmt.Errorf("unsupported band: %q", band) + } + } + + s.responseBody = time.Now() + + return nil +} diff --git a/internal/git/stats/send_pack_test.go b/internal/git/stats/send_pack_test.go new file mode 100644 index 000000000..6dd126e85 --- /dev/null +++ b/internal/git/stats/send_pack_test.go @@ -0,0 +1,100 @@ +package stats + +import ( + "bytes" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" +) + +func TestSendPack_Parse(t *testing.T) { + t.Run("valid input", func(t *testing.T) { + var progress []byte + sendPack := &SendPack{ + ReportProgress: func(p []byte) { + progress = append(progress, p...) + }, + } + + var response bytes.Buffer + + startTime := time.Now() + + gittest.WritePktlineString(t, &response, "\x01000eunpack ok\n0015ok refs/heads/branch\n0000") + gittest.WritePktlineString(t, &response, "\x010013ok refs/heads/other0000\n") + gittest.WritePktlineString(t, &response, "\x02progress-bytes") + gittest.WritePktlineFlush(t, &response) + + err := sendPack.Parse(&response) + require.NoError(t, err) + + endTime := time.Now() + + require.Equal(t, 4, sendPack.packets) + require.Equal(t, 44, sendPack.largestPacketSize) + + for _, band := range []string{"pack", "progress"} { + require.True(t, startTime.Before(sendPack.multiband[band].firstPacket)) + require.True(t, endTime.After(sendPack.multiband[band].firstPacket)) + sendPack.multiband[band].firstPacket = startTime + } + + require.Equal(t, &bandInfo{ + firstPacket: startTime, + size: 73, + packets: 2, + }, sendPack.multiband["pack"]) + + require.Equal(t, &bandInfo{ + firstPacket: startTime, + size: 15, + packets: 1, + }, sendPack.multiband["progress"]) + + require.Equal(t, &bandInfo{}, sendPack.multiband["error"]) + + require.Equal(t, "progress-bytes", string(progress)) + }) + + t.Run("data after flush", func(t *testing.T) { + var response bytes.Buffer + gittest.WritePktlineString(t, &response, "\x01000eunpack ok\n0000") + gittest.WritePktlineFlush(t, &response) + gittest.WritePktlineString(t, &response, "\x01somethingsomething") + + err := (&SendPack{}).Parse(&response) + require.Equal(t, errors.New("received extra packet after flush"), err) + }) + + t.Run("unpack error", func(t *testing.T) { + var response bytes.Buffer + gittest.WritePktlineString(t, &response, "\x010011unpack error\n0000") + + err := (&SendPack{}).Parse(&response) + require.Equal(t, errors.New("expected unpack ok, got \"unpack error\\n\""), err) + }) + + t.Run("error sideband", func(t *testing.T) { + var response bytes.Buffer + gittest.WritePktlineString(t, &response, "\x01000eunpack ok\n0000") + gittest.WritePktlineString(t, &response, "\x03error-bytes") + gittest.WritePktlineFlush(t, &response) + + err := (&SendPack{}).Parse(&response) + require.Equal(t, errors.New("received error: \"error-bytes\""), err) + }) + + t.Run("failed reference update", func(t *testing.T) { + var response bytes.Buffer + gittest.WritePktlineString(t, &response, "\x01000eunpack ok\n0000") + gittest.WritePktlineString(t, &response, "\x010014ok refs/heads/branch0000") + gittest.WritePktlineString(t, &response, "\x010021ng refs/heads/feature failure0000") + gittest.WritePktlineFlush(t, &response) + + err := (&SendPack{}).Parse(&response) + require.Equal(t, errors.New("reference update failed: \"ng refs/heads/feature failure\""), err) + }) +} |