diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2020-03-23 16:40:50 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2020-03-23 16:40:50 +0300 |
commit | 838937325bc4c2cfd0e99dd1fc463dd703770b56 (patch) | |
tree | 1b79f0b139ff4fd996acec963bd93a820c703622 | |
parent | 87a01658a1c34c398d18584a8b0ffcd6e8bca266 (diff) | |
parent | fd38ae5693bae92e3dc9e9b6affbfaf4d48ca437 (diff) |
Merge branch 'pks-git-stats-reference-discovery' into 'master'
Extract reference discovery parsing from HTTP analysis code
See merge request gitlab-org/gitaly!1937
-rw-r--r-- | cmd/gitaly-debug/analyzehttp.go | 6 | ||||
-rw-r--r-- | internal/blackbox/blackbox.go | 2 | ||||
-rw-r--r-- | internal/git/stats/analyzehttp.go | 113 | ||||
-rw-r--r-- | internal/git/stats/analyzehttp_test.go | 6 | ||||
-rw-r--r-- | internal/git/stats/reference_discovery.go | 137 | ||||
-rw-r--r-- | internal/git/stats/reference_discovery_test.go | 79 | ||||
-rw-r--r-- | internal/service/smarthttp/inforefs_test.go | 6 |
7 files changed, 235 insertions, 114 deletions
diff --git a/cmd/gitaly-debug/analyzehttp.go b/cmd/gitaly-debug/analyzehttp.go index bb2492d17..2088acdc5 100644 --- a/cmd/gitaly-debug/analyzehttp.go +++ b/cmd/gitaly-debug/analyzehttp.go @@ -20,9 +20,9 @@ func analyzeHTTPClone(cloneURL string) { {"response header time", st.Get.ResponseHeader()}, {"first Git packet", st.Get.FirstGitPacket()}, {"response body time", st.Get.ResponseBody()}, - {"payload size", st.Get.PayloadSize()}, - {"Git packets received", st.Get.Packets()}, - {"refs advertised", len(st.Get.Refs())}, + {"payload size", st.Get.PayloadSize}, + {"Git packets received", st.Get.Packets}, + {"refs advertised", len(st.Get.Refs)}, {"wanted refs", st.RefsWanted()}, } { entry.print() diff --git a/internal/blackbox/blackbox.go b/internal/blackbox/blackbox.go index b3efcfa68..db3242c88 100644 --- a/internal/blackbox/blackbox.go +++ b/internal/blackbox/blackbox.go @@ -64,7 +64,7 @@ func doProbe(probe Probe) { setGauge(getFirstPacket, clone.Get.FirstGitPacket().Seconds()) setGauge(getTotalTime, clone.Get.ResponseBody().Seconds()) - setGauge(getAdvertisedRefs, float64(len(clone.Get.Refs()))) + setGauge(getAdvertisedRefs, float64(len(clone.Get.Refs))) setGauge(wantedRefs, float64(clone.RefsWanted())) setGauge(postTotalTime, clone.Post.ResponseBody().Seconds()) setGauge(postFirstProgressPacket, clone.Post.BandFirstPacket("progress").Seconds()) diff --git a/internal/git/stats/analyzehttp.go b/internal/git/stats/analyzehttp.go index c5c776a78..7b7303c4b 100644 --- a/internal/git/stats/analyzehttp.go +++ b/internal/git/stats/analyzehttp.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "os" "strings" @@ -48,116 +49,17 @@ func ctxErr(ctx context.Context, err error) error { return err } -type Reference struct { - Oid, Name string -} - type Get struct { start time.Time responseHeader time.Duration httpStatus int - firstGitPacket time.Duration - responseBody time.Duration - payloadSize int64 - packets int - refs []Reference - caps []string + ReferenceDiscovery } func (g *Get) ResponseHeader() time.Duration { return g.responseHeader } func (g *Get) HTTPStatus() int { return g.httpStatus } -func (g *Get) FirstGitPacket() time.Duration { return g.firstGitPacket } -func (g *Get) ResponseBody() time.Duration { return g.responseBody } -func (g *Get) PayloadSize() int64 { return g.payloadSize } -func (g *Get) Packets() int { return g.packets } -func (g *Get) Refs() []Reference { return g.refs } -func (g *Get) Caps() []string { return g.caps } - -type uploadPackState int - -const ( - uploadPackExpectService uploadPackState = iota - uploadPackExpectFlush - uploadPackExpectRefWithCaps - uploadPackExpectRef - uploadPackExpectEnd -) - -// Expected response: -// - "# service=git-upload-pack\n" -// - FLUSH -// - "<OID> <ref>\x00<capabilities>\n" -// - "<OID> <ref>\n" -// - ... -// - FLUSH -func (g *Get) Parse(body io.Reader) error { - state := uploadPackExpectService - scanner := pktline.NewScanner(body) - - for ; scanner.Scan(); g.packets++ { - pkt := scanner.Bytes() - data := pktline.Data(pkt) - g.payloadSize += int64(len(data)) - - switch state { - case uploadPackExpectService: - g.firstGitPacket = time.Since(g.start) - header := string(data) - if header != "# service=git-upload-pack\n" { - return fmt.Errorf("unexpected header %q", header) - } - - state = uploadPackExpectFlush - case uploadPackExpectFlush: - if !pktline.IsFlush(pkt) { - return errors.New("missing flush after service announcement") - } - - state = uploadPackExpectRefWithCaps - case uploadPackExpectRefWithCaps: - split := bytes.Split(data, []byte{0}) - if len(split) != 2 { - return errors.New("invalid first reference line") - } - g.caps = strings.Split(string(split[1]), " ") - - ref := strings.SplitN(string(split[0]), " ", 2) - if len(ref) != 2 { - continue - } - g.refs = append(g.refs, Reference{Oid: ref[0], Name: ref[1]}) - - state = uploadPackExpectRef - case uploadPackExpectRef: - if pktline.IsFlush(pkt) { - state = uploadPackExpectEnd - continue - } - - split := strings.SplitN(string(data), " ", 2) - if len(split) != 2 { - continue - } - g.refs = append(g.refs, Reference{Oid: split[0], Name: split[1]}) - case uploadPackExpectEnd: - return errors.New("received packet after flush") - } - } - - if err := scanner.Err(); err != nil { - return err - } - if len(g.refs) == 0 { - return errors.New("received no references") - } - if len(g.caps) == 0 { - return errors.New("received no capabilities") - } - - g.responseBody = time.Since(g.start) - - return nil -} +func (g *Get) FirstGitPacket() time.Duration { return g.FirstPacket.Sub(g.start) } +func (g *Get) ResponseBody() time.Duration { return g.LastPacket.Sub(g.start) } func (cl *Clone) doGet(ctx context.Context) error { req, err := http.NewRequest("GET", cl.URL+"/info/refs?service=git-upload-pack", nil) @@ -188,7 +90,10 @@ func (cl *Clone) doGet(ctx context.Context) error { if err != nil { return err } - defer resp.Body.Close() + defer func() { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + }() if code := resp.StatusCode; code < 200 || code >= 400 { return fmt.Errorf("git http get: unexpected http status: %d", code) @@ -211,7 +116,7 @@ func (cl *Clone) doGet(ctx context.Context) error { return err } - for _, ref := range cl.Get.Refs() { + for _, ref := range cl.Get.Refs { if strings.HasPrefix(ref.Name, "refs/heads/") || strings.HasPrefix(ref.Name, "refs/tags/") { cl.wants = append(cl.wants, ref.Oid) } diff --git a/internal/git/stats/analyzehttp_test.go b/internal/git/stats/analyzehttp_test.go index 75b27eb12..c041db9b1 100644 --- a/internal/git/stats/analyzehttp_test.go +++ b/internal/git/stats/analyzehttp_test.go @@ -28,9 +28,9 @@ func TestClone(t *testing.T) { require.Greater(t, clone.RefsWanted(), expectedWants, "number of wanted refs") require.Equal(t, 200, clone.Get.HTTPStatus(), "get status") - require.Greater(t, clone.Get.Packets(), 0, "number of get packets") - require.Greater(t, clone.Get.PayloadSize(), int64(0), "get payload size") - require.Greater(t, len(clone.Get.Caps()), 10, "get capabilities") + require.Greater(t, clone.Get.Packets, 0, "number of get packets") + require.Greater(t, clone.Get.PayloadSize, int64(0), "get payload size") + require.Greater(t, len(clone.Get.Caps), 10, "get capabilities") previousValue := time.Duration(0) for _, m := range []struct { diff --git a/internal/git/stats/reference_discovery.go b/internal/git/stats/reference_discovery.go new file mode 100644 index 000000000..19e626b79 --- /dev/null +++ b/internal/git/stats/reference_discovery.go @@ -0,0 +1,137 @@ +package stats + +import ( + "errors" + "fmt" + "io" + "strings" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/internal/helper/text" +) + +// Reference as used by the reference discovery protocol +type Reference struct { + // Oid is the object ID the reference points to + Oid string + // Name of the reference. The name will be suffixed with ^{} in case + // the reference is the peeled commit. + Name string +} + +// ReferenceDiscovery contains information about a reference discovery session. +type ReferenceDiscovery struct { + // FirstPacket tracks the time when the first pktline was received + FirstPacket time.Time + // LastPacket tracks the time when the last pktline was received + LastPacket time.Time + // PayloadSize tracks the size of all pktlines' data + PayloadSize int64 + // Packets tracks the total number of packets consumed + Packets int + // Refs contains all announced references + Refs []Reference + // Caps contains all supported capabilities + Caps []string +} + +type referenceDiscoveryState int + +const ( + referenceDiscoveryExpectService referenceDiscoveryState = iota + referenceDiscoveryExpectFlush + referenceDiscoveryExpectRefWithCaps + referenceDiscoveryExpectRef + referenceDiscoveryExpectEnd +) + +// ParseReferenceDiscovery parses a client's reference discovery stream and +// returns either information about the reference discovery or an error in case +// it couldn't make sense of the client's request. +func ParseReferenceDiscovery(body io.Reader) (ReferenceDiscovery, error) { + d := ReferenceDiscovery{} + return d, d.Parse(body) +} + +// Parse parses a client's reference discovery stream into the given +// ReferenceDiscovery struct or returns an error in case it couldn't make sense +// of the client's request. +// +// Expected protocol: +// - "# service=git-upload-pack\n" +// - FLUSH +// - "<OID> <ref>\x00<capabilities>\n" +// - "<OID> <ref>\n" +// - ... +// - FLUSH +func (d *ReferenceDiscovery) Parse(body io.Reader) error { + state := referenceDiscoveryExpectService + scanner := pktline.NewScanner(body) + + for ; scanner.Scan(); d.Packets++ { + pkt := scanner.Bytes() + data := text.ChompBytes(pktline.Data(pkt)) + d.PayloadSize += int64(len(data)) + + switch state { + case referenceDiscoveryExpectService: + d.FirstPacket = time.Now() + if data != "# service=git-upload-pack" { + return fmt.Errorf("unexpected header %q", data) + } + + state = referenceDiscoveryExpectFlush + case referenceDiscoveryExpectFlush: + if !pktline.IsFlush(pkt) { + return errors.New("missing flush after service announcement") + } + + state = referenceDiscoveryExpectRefWithCaps + case referenceDiscoveryExpectRefWithCaps: + split := strings.SplitN(data, "\000", 2) + if len(split) != 2 { + return errors.New("invalid first reference line") + } + + ref := strings.SplitN(string(split[0]), " ", 2) + if len(ref) != 2 { + return errors.New("invalid reference line") + } + d.Refs = append(d.Refs, Reference{Oid: ref[0], Name: ref[1]}) + d.Caps = strings.Split(string(split[1]), " ") + + state = referenceDiscoveryExpectRef + case referenceDiscoveryExpectRef: + if pktline.IsFlush(pkt) { + state = referenceDiscoveryExpectEnd + continue + } + + split := strings.SplitN(data, " ", 2) + if len(split) != 2 { + return errors.New("invalid reference line") + } + d.Refs = append(d.Refs, Reference{Oid: split[0], Name: split[1]}) + case referenceDiscoveryExpectEnd: + return errors.New("received packet after flush") + } + } + + if err := scanner.Err(); err != nil { + return err + } + if len(d.Refs) == 0 { + return errors.New("received no references") + } + if len(d.Caps) == 0 { + return errors.New("received no capabilities") + } + if state != referenceDiscoveryExpectEnd { + return errors.New("discovery ended prematurely") + } + + d.LastPacket = time.Now() + + return nil +} diff --git a/internal/git/stats/reference_discovery_test.go b/internal/git/stats/reference_discovery_test.go new file mode 100644 index 000000000..00bc947e3 --- /dev/null +++ b/internal/git/stats/reference_discovery_test.go @@ -0,0 +1,79 @@ +package stats + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/git/pktline" +) + +func TestSingleRefParses(t *testing.T) { + buf := &bytes.Buffer{} + pktline.WriteString(buf, "# service=git-upload-pack\n") + pktline.WriteFlush(buf) + pktline.WriteString(buf, oid1+" HEAD\x00capability") + pktline.WriteFlush(buf) + + d, err := ParseReferenceDiscovery(buf) + require.NoError(t, err) + require.Equal(t, []string{"capability"}, d.Caps) + require.Equal(t, []Reference{{Oid: oid1, Name: "HEAD"}}, d.Refs) +} + +func TestMultipleRefsAndCapsParse(t *testing.T) { + buf := &bytes.Buffer{} + pktline.WriteString(buf, "# service=git-upload-pack\n") + pktline.WriteFlush(buf) + pktline.WriteString(buf, oid1+" HEAD\x00first second") + pktline.WriteString(buf, oid2+" refs/heads/master") + pktline.WriteFlush(buf) + + d, err := ParseReferenceDiscovery(buf) + require.NoError(t, err) + require.Equal(t, []string{"first", "second"}, d.Caps) + require.Equal(t, []Reference{{Oid: oid1, Name: "HEAD"}, {Oid: oid2, Name: "refs/heads/master"}}, d.Refs) +} + +func TestInvalidHeaderFails(t *testing.T) { + buf := &bytes.Buffer{} + pktline.WriteString(buf, "# service=invalid\n") + pktline.WriteFlush(buf) + pktline.WriteString(buf, oid1+" HEAD\x00caps") + pktline.WriteFlush(buf) + + _, err := ParseReferenceDiscovery(buf) + require.Error(t, err) +} + +func TestMissingRefsFail(t *testing.T) { + buf := &bytes.Buffer{} + pktline.WriteString(buf, "# service=git-upload-pack\n") + pktline.WriteFlush(buf) + pktline.WriteFlush(buf) + + _, err := ParseReferenceDiscovery(buf) + require.Error(t, err) +} + +func TestInvalidRefFail(t *testing.T) { + buf := &bytes.Buffer{} + pktline.WriteString(buf, "# service=git-upload-pack\n") + pktline.WriteFlush(buf) + pktline.WriteString(buf, oid1+" HEAD\x00caps") + pktline.WriteString(buf, oid2) + pktline.WriteFlush(buf) + + _, err := ParseReferenceDiscovery(buf) + require.Error(t, err) +} + +func TestMissingTrailingFlushFails(t *testing.T) { + buf := &bytes.Buffer{} + pktline.WriteString(buf, "# service=git-upload-pack\n") + pktline.WriteFlush(buf) + pktline.WriteString(buf, oid1+" HEAD\x00caps") + + d := ReferenceDiscovery{} + require.Error(t, d.Parse(buf)) +} diff --git a/internal/service/smarthttp/inforefs_test.go b/internal/service/smarthttp/inforefs_test.go index 899a39d3e..9606fb1c4 100644 --- a/internal/service/smarthttp/inforefs_test.go +++ b/internal/service/smarthttp/inforefs_test.go @@ -71,11 +71,11 @@ func TestSuccessfulInfoRefsUploadWithPartialClone(t *testing.T) { err = partialRefs.Parse(bytes.NewReader(partialResponse)) require.NoError(t, err) - require.Equal(t, fullRefs.Refs(), partialRefs.Refs()) + require.Equal(t, fullRefs.Refs, partialRefs.Refs) for _, c := range []string{"allow-tip-sha1-in-want", "allow-reachable-sha1-in-want", "filter"} { - require.Contains(t, partialRefs.Caps(), c) - require.NotContains(t, fullRefs.Caps(), c) + require.Contains(t, partialRefs.Caps, c) + require.NotContains(t, fullRefs.Caps, c) } } |