Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2020-03-23 16:40:50 +0300
committerJacob Vosmaer <jacob@gitlab.com>2020-03-23 16:40:50 +0300
commit838937325bc4c2cfd0e99dd1fc463dd703770b56 (patch)
tree1b79f0b139ff4fd996acec963bd93a820c703622
parent87a01658a1c34c398d18584a8b0ffcd6e8bca266 (diff)
parentfd38ae5693bae92e3dc9e9b6affbfaf4d48ca437 (diff)
Merge branch 'pks-git-stats-reference-discovery' into 'master'
Extract reference discovery parsing from HTTP analysis code See merge request gitlab-org/gitaly!1937
-rw-r--r--cmd/gitaly-debug/analyzehttp.go6
-rw-r--r--internal/blackbox/blackbox.go2
-rw-r--r--internal/git/stats/analyzehttp.go113
-rw-r--r--internal/git/stats/analyzehttp_test.go6
-rw-r--r--internal/git/stats/reference_discovery.go137
-rw-r--r--internal/git/stats/reference_discovery_test.go79
-rw-r--r--internal/service/smarthttp/inforefs_test.go6
7 files changed, 235 insertions, 114 deletions
diff --git a/cmd/gitaly-debug/analyzehttp.go b/cmd/gitaly-debug/analyzehttp.go
index bb2492d17..2088acdc5 100644
--- a/cmd/gitaly-debug/analyzehttp.go
+++ b/cmd/gitaly-debug/analyzehttp.go
@@ -20,9 +20,9 @@ func analyzeHTTPClone(cloneURL string) {
{"response header time", st.Get.ResponseHeader()},
{"first Git packet", st.Get.FirstGitPacket()},
{"response body time", st.Get.ResponseBody()},
- {"payload size", st.Get.PayloadSize()},
- {"Git packets received", st.Get.Packets()},
- {"refs advertised", len(st.Get.Refs())},
+ {"payload size", st.Get.PayloadSize},
+ {"Git packets received", st.Get.Packets},
+ {"refs advertised", len(st.Get.Refs)},
{"wanted refs", st.RefsWanted()},
} {
entry.print()
diff --git a/internal/blackbox/blackbox.go b/internal/blackbox/blackbox.go
index b3efcfa68..db3242c88 100644
--- a/internal/blackbox/blackbox.go
+++ b/internal/blackbox/blackbox.go
@@ -64,7 +64,7 @@ func doProbe(probe Probe) {
setGauge(getFirstPacket, clone.Get.FirstGitPacket().Seconds())
setGauge(getTotalTime, clone.Get.ResponseBody().Seconds())
- setGauge(getAdvertisedRefs, float64(len(clone.Get.Refs())))
+ setGauge(getAdvertisedRefs, float64(len(clone.Get.Refs)))
setGauge(wantedRefs, float64(clone.RefsWanted()))
setGauge(postTotalTime, clone.Post.ResponseBody().Seconds())
setGauge(postFirstProgressPacket, clone.Post.BandFirstPacket("progress").Seconds())
diff --git a/internal/git/stats/analyzehttp.go b/internal/git/stats/analyzehttp.go
index c5c776a78..7b7303c4b 100644
--- a/internal/git/stats/analyzehttp.go
+++ b/internal/git/stats/analyzehttp.go
@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
+ "io/ioutil"
"net/http"
"os"
"strings"
@@ -48,116 +49,17 @@ func ctxErr(ctx context.Context, err error) error {
return err
}
-type Reference struct {
- Oid, Name string
-}
-
type Get struct {
start time.Time
responseHeader time.Duration
httpStatus int
- firstGitPacket time.Duration
- responseBody time.Duration
- payloadSize int64
- packets int
- refs []Reference
- caps []string
+ ReferenceDiscovery
}
func (g *Get) ResponseHeader() time.Duration { return g.responseHeader }
func (g *Get) HTTPStatus() int { return g.httpStatus }
-func (g *Get) FirstGitPacket() time.Duration { return g.firstGitPacket }
-func (g *Get) ResponseBody() time.Duration { return g.responseBody }
-func (g *Get) PayloadSize() int64 { return g.payloadSize }
-func (g *Get) Packets() int { return g.packets }
-func (g *Get) Refs() []Reference { return g.refs }
-func (g *Get) Caps() []string { return g.caps }
-
-type uploadPackState int
-
-const (
- uploadPackExpectService uploadPackState = iota
- uploadPackExpectFlush
- uploadPackExpectRefWithCaps
- uploadPackExpectRef
- uploadPackExpectEnd
-)
-
-// Expected response:
-// - "# service=git-upload-pack\n"
-// - FLUSH
-// - "<OID> <ref>\x00<capabilities>\n"
-// - "<OID> <ref>\n"
-// - ...
-// - FLUSH
-func (g *Get) Parse(body io.Reader) error {
- state := uploadPackExpectService
- scanner := pktline.NewScanner(body)
-
- for ; scanner.Scan(); g.packets++ {
- pkt := scanner.Bytes()
- data := pktline.Data(pkt)
- g.payloadSize += int64(len(data))
-
- switch state {
- case uploadPackExpectService:
- g.firstGitPacket = time.Since(g.start)
- header := string(data)
- if header != "# service=git-upload-pack\n" {
- return fmt.Errorf("unexpected header %q", header)
- }
-
- state = uploadPackExpectFlush
- case uploadPackExpectFlush:
- if !pktline.IsFlush(pkt) {
- return errors.New("missing flush after service announcement")
- }
-
- state = uploadPackExpectRefWithCaps
- case uploadPackExpectRefWithCaps:
- split := bytes.Split(data, []byte{0})
- if len(split) != 2 {
- return errors.New("invalid first reference line")
- }
- g.caps = strings.Split(string(split[1]), " ")
-
- ref := strings.SplitN(string(split[0]), " ", 2)
- if len(ref) != 2 {
- continue
- }
- g.refs = append(g.refs, Reference{Oid: ref[0], Name: ref[1]})
-
- state = uploadPackExpectRef
- case uploadPackExpectRef:
- if pktline.IsFlush(pkt) {
- state = uploadPackExpectEnd
- continue
- }
-
- split := strings.SplitN(string(data), " ", 2)
- if len(split) != 2 {
- continue
- }
- g.refs = append(g.refs, Reference{Oid: split[0], Name: split[1]})
- case uploadPackExpectEnd:
- return errors.New("received packet after flush")
- }
- }
-
- if err := scanner.Err(); err != nil {
- return err
- }
- if len(g.refs) == 0 {
- return errors.New("received no references")
- }
- if len(g.caps) == 0 {
- return errors.New("received no capabilities")
- }
-
- g.responseBody = time.Since(g.start)
-
- return nil
-}
+func (g *Get) FirstGitPacket() time.Duration { return g.FirstPacket.Sub(g.start) }
+func (g *Get) ResponseBody() time.Duration { return g.LastPacket.Sub(g.start) }
func (cl *Clone) doGet(ctx context.Context) error {
req, err := http.NewRequest("GET", cl.URL+"/info/refs?service=git-upload-pack", nil)
@@ -188,7 +90,10 @@ func (cl *Clone) doGet(ctx context.Context) error {
if err != nil {
return err
}
- defer resp.Body.Close()
+ defer func() {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ }()
if code := resp.StatusCode; code < 200 || code >= 400 {
return fmt.Errorf("git http get: unexpected http status: %d", code)
@@ -211,7 +116,7 @@ func (cl *Clone) doGet(ctx context.Context) error {
return err
}
- for _, ref := range cl.Get.Refs() {
+ for _, ref := range cl.Get.Refs {
if strings.HasPrefix(ref.Name, "refs/heads/") || strings.HasPrefix(ref.Name, "refs/tags/") {
cl.wants = append(cl.wants, ref.Oid)
}
diff --git a/internal/git/stats/analyzehttp_test.go b/internal/git/stats/analyzehttp_test.go
index 75b27eb12..c041db9b1 100644
--- a/internal/git/stats/analyzehttp_test.go
+++ b/internal/git/stats/analyzehttp_test.go
@@ -28,9 +28,9 @@ func TestClone(t *testing.T) {
require.Greater(t, clone.RefsWanted(), expectedWants, "number of wanted refs")
require.Equal(t, 200, clone.Get.HTTPStatus(), "get status")
- require.Greater(t, clone.Get.Packets(), 0, "number of get packets")
- require.Greater(t, clone.Get.PayloadSize(), int64(0), "get payload size")
- require.Greater(t, len(clone.Get.Caps()), 10, "get capabilities")
+ require.Greater(t, clone.Get.Packets, 0, "number of get packets")
+ require.Greater(t, clone.Get.PayloadSize, int64(0), "get payload size")
+ require.Greater(t, len(clone.Get.Caps), 10, "get capabilities")
previousValue := time.Duration(0)
for _, m := range []struct {
diff --git a/internal/git/stats/reference_discovery.go b/internal/git/stats/reference_discovery.go
new file mode 100644
index 000000000..19e626b79
--- /dev/null
+++ b/internal/git/stats/reference_discovery.go
@@ -0,0 +1,137 @@
+package stats
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "strings"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/internal/git/pktline"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/text"
+)
+
+// Reference as used by the reference discovery protocol
+type Reference struct {
+ // Oid is the object ID the reference points to
+ Oid string
+ // Name of the reference. The name will be suffixed with ^{} in case
+ // the reference is the peeled commit.
+ Name string
+}
+
+// ReferenceDiscovery contains information about a reference discovery session.
+type ReferenceDiscovery struct {
+ // FirstPacket tracks the time when the first pktline was received
+ FirstPacket time.Time
+ // LastPacket tracks the time when the last pktline was received
+ LastPacket time.Time
+ // PayloadSize tracks the size of all pktlines' data
+ PayloadSize int64
+ // Packets tracks the total number of packets consumed
+ Packets int
+ // Refs contains all announced references
+ Refs []Reference
+ // Caps contains all supported capabilities
+ Caps []string
+}
+
+type referenceDiscoveryState int
+
+const (
+ referenceDiscoveryExpectService referenceDiscoveryState = iota
+ referenceDiscoveryExpectFlush
+ referenceDiscoveryExpectRefWithCaps
+ referenceDiscoveryExpectRef
+ referenceDiscoveryExpectEnd
+)
+
+// ParseReferenceDiscovery parses a client's reference discovery stream and
+// returns either information about the reference discovery or an error in case
+// it couldn't make sense of the client's request.
+func ParseReferenceDiscovery(body io.Reader) (ReferenceDiscovery, error) {
+ d := ReferenceDiscovery{}
+ return d, d.Parse(body)
+}
+
+// Parse parses a client's reference discovery stream into the given
+// ReferenceDiscovery struct or returns an error in case it couldn't make sense
+// of the client's request.
+//
+// Expected protocol:
+// - "# service=git-upload-pack\n"
+// - FLUSH
+// - "<OID> <ref>\x00<capabilities>\n"
+// - "<OID> <ref>\n"
+// - ...
+// - FLUSH
+func (d *ReferenceDiscovery) Parse(body io.Reader) error {
+ state := referenceDiscoveryExpectService
+ scanner := pktline.NewScanner(body)
+
+ for ; scanner.Scan(); d.Packets++ {
+ pkt := scanner.Bytes()
+ data := text.ChompBytes(pktline.Data(pkt))
+ d.PayloadSize += int64(len(data))
+
+ switch state {
+ case referenceDiscoveryExpectService:
+ d.FirstPacket = time.Now()
+ if data != "# service=git-upload-pack" {
+ return fmt.Errorf("unexpected header %q", data)
+ }
+
+ state = referenceDiscoveryExpectFlush
+ case referenceDiscoveryExpectFlush:
+ if !pktline.IsFlush(pkt) {
+ return errors.New("missing flush after service announcement")
+ }
+
+ state = referenceDiscoveryExpectRefWithCaps
+ case referenceDiscoveryExpectRefWithCaps:
+ split := strings.SplitN(data, "\000", 2)
+ if len(split) != 2 {
+ return errors.New("invalid first reference line")
+ }
+
+ ref := strings.SplitN(string(split[0]), " ", 2)
+ if len(ref) != 2 {
+ return errors.New("invalid reference line")
+ }
+ d.Refs = append(d.Refs, Reference{Oid: ref[0], Name: ref[1]})
+ d.Caps = strings.Split(string(split[1]), " ")
+
+ state = referenceDiscoveryExpectRef
+ case referenceDiscoveryExpectRef:
+ if pktline.IsFlush(pkt) {
+ state = referenceDiscoveryExpectEnd
+ continue
+ }
+
+ split := strings.SplitN(data, " ", 2)
+ if len(split) != 2 {
+ return errors.New("invalid reference line")
+ }
+ d.Refs = append(d.Refs, Reference{Oid: split[0], Name: split[1]})
+ case referenceDiscoveryExpectEnd:
+ return errors.New("received packet after flush")
+ }
+ }
+
+ if err := scanner.Err(); err != nil {
+ return err
+ }
+ if len(d.Refs) == 0 {
+ return errors.New("received no references")
+ }
+ if len(d.Caps) == 0 {
+ return errors.New("received no capabilities")
+ }
+ if state != referenceDiscoveryExpectEnd {
+ return errors.New("discovery ended prematurely")
+ }
+
+ d.LastPacket = time.Now()
+
+ return nil
+}
diff --git a/internal/git/stats/reference_discovery_test.go b/internal/git/stats/reference_discovery_test.go
new file mode 100644
index 000000000..00bc947e3
--- /dev/null
+++ b/internal/git/stats/reference_discovery_test.go
@@ -0,0 +1,79 @@
+package stats
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/git/pktline"
+)
+
+func TestSingleRefParses(t *testing.T) {
+ buf := &bytes.Buffer{}
+ pktline.WriteString(buf, "# service=git-upload-pack\n")
+ pktline.WriteFlush(buf)
+ pktline.WriteString(buf, oid1+" HEAD\x00capability")
+ pktline.WriteFlush(buf)
+
+ d, err := ParseReferenceDiscovery(buf)
+ require.NoError(t, err)
+ require.Equal(t, []string{"capability"}, d.Caps)
+ require.Equal(t, []Reference{{Oid: oid1, Name: "HEAD"}}, d.Refs)
+}
+
+func TestMultipleRefsAndCapsParse(t *testing.T) {
+ buf := &bytes.Buffer{}
+ pktline.WriteString(buf, "# service=git-upload-pack\n")
+ pktline.WriteFlush(buf)
+ pktline.WriteString(buf, oid1+" HEAD\x00first second")
+ pktline.WriteString(buf, oid2+" refs/heads/master")
+ pktline.WriteFlush(buf)
+
+ d, err := ParseReferenceDiscovery(buf)
+ require.NoError(t, err)
+ require.Equal(t, []string{"first", "second"}, d.Caps)
+ require.Equal(t, []Reference{{Oid: oid1, Name: "HEAD"}, {Oid: oid2, Name: "refs/heads/master"}}, d.Refs)
+}
+
+func TestInvalidHeaderFails(t *testing.T) {
+ buf := &bytes.Buffer{}
+ pktline.WriteString(buf, "# service=invalid\n")
+ pktline.WriteFlush(buf)
+ pktline.WriteString(buf, oid1+" HEAD\x00caps")
+ pktline.WriteFlush(buf)
+
+ _, err := ParseReferenceDiscovery(buf)
+ require.Error(t, err)
+}
+
+func TestMissingRefsFail(t *testing.T) {
+ buf := &bytes.Buffer{}
+ pktline.WriteString(buf, "# service=git-upload-pack\n")
+ pktline.WriteFlush(buf)
+ pktline.WriteFlush(buf)
+
+ _, err := ParseReferenceDiscovery(buf)
+ require.Error(t, err)
+}
+
+func TestInvalidRefFail(t *testing.T) {
+ buf := &bytes.Buffer{}
+ pktline.WriteString(buf, "# service=git-upload-pack\n")
+ pktline.WriteFlush(buf)
+ pktline.WriteString(buf, oid1+" HEAD\x00caps")
+ pktline.WriteString(buf, oid2)
+ pktline.WriteFlush(buf)
+
+ _, err := ParseReferenceDiscovery(buf)
+ require.Error(t, err)
+}
+
+func TestMissingTrailingFlushFails(t *testing.T) {
+ buf := &bytes.Buffer{}
+ pktline.WriteString(buf, "# service=git-upload-pack\n")
+ pktline.WriteFlush(buf)
+ pktline.WriteString(buf, oid1+" HEAD\x00caps")
+
+ d := ReferenceDiscovery{}
+ require.Error(t, d.Parse(buf))
+}
diff --git a/internal/service/smarthttp/inforefs_test.go b/internal/service/smarthttp/inforefs_test.go
index 899a39d3e..9606fb1c4 100644
--- a/internal/service/smarthttp/inforefs_test.go
+++ b/internal/service/smarthttp/inforefs_test.go
@@ -71,11 +71,11 @@ func TestSuccessfulInfoRefsUploadWithPartialClone(t *testing.T) {
err = partialRefs.Parse(bytes.NewReader(partialResponse))
require.NoError(t, err)
- require.Equal(t, fullRefs.Refs(), partialRefs.Refs())
+ require.Equal(t, fullRefs.Refs, partialRefs.Refs)
for _, c := range []string{"allow-tip-sha1-in-want", "allow-reachable-sha1-in-want", "filter"} {
- require.Contains(t, partialRefs.Caps(), c)
- require.NotContains(t, fullRefs.Caps(), c)
+ require.Contains(t, partialRefs.Caps, c)
+ require.NotContains(t, fullRefs.Caps, c)
}
}