Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-14 09:18:50 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-14 09:18:50 +0300
commitf365b27d869b4e96850a10bdd40b775ab37045b6 (patch)
treeb6a4d9fbb12f19eb20d2a59a90362ecb8e8f01bc
parentb541a391a9eebc7a0b85e37fcf12e8c5bf1be5e1 (diff)
parentaf0834bff09d9935b2890312aa29c510193fb132 (diff)
Merge branch 'pks-blackbox-cleanups' into 'master'
blackbox: Code cleanups for blackbox and related stats package See merge request gitlab-org/gitaly!3578
-rw-r--r--.golangci.yml88
-rw-r--r--cmd/gitaly-blackbox/main.go6
-rw-r--r--cmd/gitaly-debug/analyzehttp.go44
-rw-r--r--internal/blackbox/blackbox.go101
-rw-r--r--internal/blackbox/config.go53
-rw-r--r--internal/blackbox/config_test.go2
-rw-r--r--internal/blackbox/prometheus.go29
-rw-r--r--internal/git/stats/analyzehttp.go358
-rw-r--r--internal/git/stats/clone.go303
-rw-r--r--internal/git/stats/clone_test.go (renamed from internal/git/stats/analyzehttp_test.go)58
-rw-r--r--internal/git/stats/fetch_pack.go133
-rw-r--r--internal/gitaly/service/smarthttp/inforefs_test.go2
12 files changed, 606 insertions, 571 deletions
diff --git a/.golangci.yml b/.golangci.yml
index 1b14fe8be..68d9c5603 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -57,22 +57,6 @@ issues:
text: "exported function `BuildCommit` should have comment or be unexported"
- linters:
- golint
- path: "internal/blackbox/blackbox.go"
- text: "exported function `Run` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/blackbox/config.go"
- text: "exported type `Config` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/blackbox/config.go"
- text: "exported type `Probe` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/blackbox/config.go"
- text: "exported function `ParseConfig` should have comment or be unexported"
- - linters:
- - golint
path: "internal/cgroups/noop.go"
text: "exported method `NoopManager.Setup` should have comment or be unexported"
- linters:
@@ -261,74 +245,6 @@ issues:
text: "exported function `WithPackfileNegotiationMetrics` should have comment or be unexported"
- linters:
- golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported type `Post` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Post.ResponseHeader` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Post.HTTPStatus` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Post.NAK` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Post.ResponseBody` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Post.Packets` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Post.LargestPacketSize` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Post.BandPackets` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Post.BandPayloadSize` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Post.BandFirstPacket` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported type `Clone` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Clone.RefsWanted` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported type `Get` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Get.ResponseHeader` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Get.HTTPStatus` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Get.FirstGitPacket` should have comment or be unexported"
- - linters:
- - golint
- path: "internal/git/stats/analyzehttp.go"
- text: "exported method `Get.ResponseBody` should have comment or be unexported"
- - linters:
- - golint
path: "internal/git/stats/packfile_negotiation.go"
text: "exported type `PackfileNegotiation` should have comment or be unexported"
- linters:
@@ -793,10 +709,6 @@ issues:
text: "Error return value is not checked"
- linters:
- errcheck
- path: "internal/git/stats/analyzehttp.go"
- text: "Error return value of `cl.printInteractive` is not checked"
- - linters:
- - errcheck
path: "internal/supervisor/supervisor_test.go"
text: "Error return value of `syscall\\.Kill` is not checked"
## errcheck: Specific issues in *_test.go files
diff --git a/cmd/gitaly-blackbox/main.go b/cmd/gitaly-blackbox/main.go
index 3ef2d7352..bcf31a194 100644
--- a/cmd/gitaly-blackbox/main.go
+++ b/cmd/gitaly-blackbox/main.go
@@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/blackbox"
"gitlab.com/gitlab-org/gitaly/v14/internal/log"
@@ -53,7 +54,10 @@ func run(configPath string) error {
return err
}
+ bb := blackbox.New(config)
+ prometheus.MustRegister(bb)
+
log.Configure(log.Loggers, config.Logging.Format, config.Logging.Level)
- return blackbox.Run(config)
+ return bb.Run()
}
diff --git a/cmd/gitaly-debug/analyzehttp.go b/cmd/gitaly-debug/analyzehttp.go
index 94299d036..dda0cb11e 100644
--- a/cmd/gitaly-debug/analyzehttp.go
+++ b/cmd/gitaly-debug/analyzehttp.go
@@ -8,48 +8,44 @@ import (
)
func analyzeHTTPClone(cloneURL string) {
- st := &stats.Clone{
- URL: cloneURL,
- Interactive: true,
- }
-
- noError(st.Perform(context.Background()))
+ st, err := stats.PerformClone(context.Background(), cloneURL, "", "", true)
+ noError(err)
- fmt.Println("\n--- GET metrics:")
+ fmt.Println("\n--- Reference discovery metrics:")
for _, entry := range []metric{
- {"response header time", st.Get.ResponseHeader()},
- {"first Git packet", st.Get.FirstGitPacket()},
- {"response body time", st.Get.ResponseBody()},
- {"payload size", st.Get.PayloadSize},
- {"Git packets received", st.Get.Packets},
- {"refs advertised", len(st.Get.Refs)},
- {"wanted refs", st.RefsWanted()},
+ {"response header time", st.ReferenceDiscovery.ResponseHeader()},
+ {"first Git packet", st.ReferenceDiscovery.FirstGitPacket()},
+ {"response body time", st.ReferenceDiscovery.ResponseBody()},
+ {"payload size", st.ReferenceDiscovery.PayloadSize()},
+ {"Git packets received", st.ReferenceDiscovery.Packets()},
+ {"refs advertised", len(st.ReferenceDiscovery.Refs())},
} {
entry.print()
}
- fmt.Println("\n--- POST metrics:")
+ fmt.Println("\n--- Fetch pack metrics:")
for _, entry := range []metric{
- {"response header time", st.Post.ResponseHeader()},
- {"time to server NAK", st.Post.NAK()},
- {"response body time", st.Post.ResponseBody()},
- {"largest single Git packet", st.Post.LargestPacketSize()},
- {"Git packets received", st.Post.Packets()},
+ {"response header time", st.FetchPack.ResponseHeader()},
+ {"time to server NAK", st.FetchPack.NAK()},
+ {"response body time", st.FetchPack.ResponseBody()},
+ {"largest single Git packet", st.FetchPack.LargestPacketSize()},
+ {"Git packets received", st.FetchPack.Packets()},
+ {"wanted refs", st.FetchPack.RefsWanted()},
} {
entry.print()
}
for _, band := range stats.Bands() {
- numPackets := st.Post.BandPackets(band)
+ numPackets := st.FetchPack.BandPackets(band)
if numPackets == 0 {
continue
}
- fmt.Printf("\n--- POST %s band\n", band)
+ fmt.Printf("\n--- FetchPack %s band\n", band)
for _, entry := range []metric{
- {"time to first packet", st.Post.BandFirstPacket(band)},
+ {"time to first packet", st.FetchPack.BandFirstPacket(band)},
{"packets", numPackets},
- {"total payload size", st.Post.BandPayloadSize(band)},
+ {"total payload size", st.FetchPack.BandPayloadSize(band)},
} {
entry.print()
}
diff --git a/internal/blackbox/blackbox.go b/internal/blackbox/blackbox.go
index b0af1b301..dd77d60f8 100644
--- a/internal/blackbox/blackbox.go
+++ b/internal/blackbox/blackbox.go
@@ -12,21 +12,81 @@ import (
"gitlab.com/gitlab-org/labkit/monitoring"
)
-func Run(cfg *Config) error {
- listener, err := net.Listen("tcp", cfg.PrometheusListenAddr)
+// Blackbox encapsulates all details required to run the blackbox prober.
+type Blackbox struct {
+ cfg Config
+
+ getFirstPacket *prometheus.GaugeVec
+ getTotalTime *prometheus.GaugeVec
+ getAdvertisedRefs *prometheus.GaugeVec
+ wantedRefs *prometheus.GaugeVec
+ postTotalTime *prometheus.GaugeVec
+ postFirstProgressPacket *prometheus.GaugeVec
+ postFirstPackPacket *prometheus.GaugeVec
+ postPackBytes *prometheus.GaugeVec
+}
+
+// New creates a new Blackbox structure.
+func New(cfg Config) Blackbox {
+ return Blackbox{
+ cfg: cfg,
+ getFirstPacket: newGauge("get_first_packet_seconds", "Time to first Git packet in GET /info/refs response"),
+ getTotalTime: newGauge("get_total_time_seconds", "Time to receive entire GET /info/refs response"),
+ getAdvertisedRefs: newGauge("get_advertised_refs", "Number of Git refs advertised in GET /info/refs"),
+ wantedRefs: newGauge("wanted_refs", "Number of Git refs selected for (fake) Git clone (branches + tags)"),
+ postTotalTime: newGauge("post_total_time_seconds", "Time to receive entire POST /upload-pack response"),
+ postFirstProgressPacket: newGauge("post_first_progress_packet_seconds", "Time to first progress band Git packet in POST /upload-pack response"),
+ postFirstPackPacket: newGauge("post_first_pack_packet_seconds", "Time to first pack band Git packet in POST /upload-pack response"),
+ postPackBytes: newGauge("post_pack_bytes", "Number of pack band bytes in POST /upload-pack response"),
+ }
+}
+
+func newGauge(name string, help string) *prometheus.GaugeVec {
+ return prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly_blackbox",
+ Subsystem: "git_http",
+ Name: name,
+ Help: help,
+ },
+ []string{"probe"},
+ )
+}
+
+// Describe is used to describe Prometheus metrics.
+func (b Blackbox) Describe(descs chan<- *prometheus.Desc) {
+ prometheus.DescribeByCollect(b, descs)
+}
+
+// Collect is used to collect Prometheus metrics.
+func (b Blackbox) Collect(metrics chan<- prometheus.Metric) {
+ b.getFirstPacket.Collect(metrics)
+ b.getTotalTime.Collect(metrics)
+ b.getAdvertisedRefs.Collect(metrics)
+ b.wantedRefs.Collect(metrics)
+ b.postTotalTime.Collect(metrics)
+ b.postFirstProgressPacket.Collect(metrics)
+ b.postFirstPackPacket.Collect(metrics)
+ b.postPackBytes.Collect(metrics)
+}
+
+// Run starts the blackbox. It sets up and serves the Prometheus listener and starts a Goroutine
+// which runs the probes.
+func (b Blackbox) Run() error {
+ listener, err := net.Listen("tcp", b.cfg.PrometheusListenAddr)
if err != nil {
return err
}
- go runProbes(cfg)
+ go b.runProbes()
return servePrometheus(listener)
}
-func runProbes(cfg *Config) {
- for ; ; time.Sleep(cfg.SleepDuration) {
- for _, probe := range cfg.Probes {
- doProbe(probe)
+func (b Blackbox) runProbes() {
+ for ; ; time.Sleep(b.cfg.sleepDuration) {
+ for _, probe := range b.cfg.Probes {
+ b.doProbe(probe)
}
}
}
@@ -38,20 +98,15 @@ func servePrometheus(l net.Listener) error {
)
}
-func doProbe(probe Probe) {
+func (b Blackbox) doProbe(probe Probe) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
entry := log.WithField("probe", probe.Name)
entry.Info("starting probe")
- clone := &stats.Clone{
- URL: probe.URL,
- User: probe.User,
- Password: probe.Password,
- }
-
- if err := clone.Perform(ctx); err != nil {
+ clone, err := stats.PerformClone(ctx, probe.URL, probe.User, probe.Password, false)
+ if err != nil {
entry.WithError(err).Error("probe failed")
return
}
@@ -62,12 +117,12 @@ func doProbe(probe Probe) {
gv.WithLabelValues(probe.Name).Set(value)
}
- setGauge(getFirstPacket, clone.Get.FirstGitPacket().Seconds())
- setGauge(getTotalTime, clone.Get.ResponseBody().Seconds())
- setGauge(getAdvertisedRefs, float64(len(clone.Get.Refs)))
- setGauge(wantedRefs, float64(clone.RefsWanted()))
- setGauge(postTotalTime, clone.Post.ResponseBody().Seconds())
- setGauge(postFirstProgressPacket, clone.Post.BandFirstPacket("progress").Seconds())
- setGauge(postFirstPackPacket, clone.Post.BandFirstPacket("pack").Seconds())
- setGauge(postPackBytes, float64(clone.Post.BandPayloadSize("pack")))
+ setGauge(b.getFirstPacket, clone.ReferenceDiscovery.FirstGitPacket().Seconds())
+ setGauge(b.getTotalTime, clone.ReferenceDiscovery.ResponseBody().Seconds())
+ setGauge(b.getAdvertisedRefs, float64(len(clone.ReferenceDiscovery.Refs())))
+ setGauge(b.wantedRefs, float64(clone.FetchPack.RefsWanted()))
+ setGauge(b.postTotalTime, clone.FetchPack.ResponseBody().Seconds())
+ setGauge(b.postFirstProgressPacket, clone.FetchPack.BandFirstPacket("progress").Seconds())
+ setGauge(b.postFirstPackPacket, clone.FetchPack.BandFirstPacket("pack").Seconds())
+ setGauge(b.postPackBytes, float64(clone.FetchPack.BandPayloadSize("pack")))
}
diff --git a/internal/blackbox/config.go b/internal/blackbox/config.go
index 064b7788b..6e8da380b 100644
--- a/internal/blackbox/config.go
+++ b/internal/blackbox/config.go
@@ -9,55 +9,72 @@ import (
logconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/log"
)
+// Config is the configuration for gitaly-blackbox.
type Config struct {
+ // PrometheusListenAddr is the listen address on which Prometheus metrics should be
+ // made available for clients.
PrometheusListenAddr string `toml:"prometheus_listen_addr"`
- Sleep int `toml:"sleep"`
- SleepDuration time.Duration
- Logging logconfig.Config `toml:"logging"`
- Probes []Probe `toml:"probe"`
+ // Sleep is the number of seconds between probe runs.
+ Sleep int `toml:"sleep"`
+ // sleepDuration is the same as Sleep but converted to a proper duration.
+ sleepDuration time.Duration
+ // Logging configures logging.
+ Logging logconfig.Config `toml:"logging"`
+ // Probes defines endpoints to probe. At least one probe must be defined.
+ Probes []Probe `toml:"probe"`
}
+// Probe is the configuration for a specific endpoint whose clone performance should be exercised.
type Probe struct {
- Name string `toml:"name"`
- URL string `toml:"url"`
- User string `toml:"user"`
+ // Name is the name of the probe. This is used both for logging and for exported
+ // Prometheus metrics.
+ Name string `toml:"name"`
+ // URL is the URL of the Git repository that should be probed. For now, only the
+ // HTTP transport is supported.
+ URL string `toml:"url"`
+ // User is the user to authenticate as when connecting to the repository.
+ User string `toml:"user"`
+ // Password is the password to authenticate with when connecting to the repository.
+ // Note that this password may easily leak when connecting to a non-HTTPS URL.
Password string `toml:"password"`
}
-func ParseConfig(raw string) (*Config, error) {
- config := &Config{}
- if err := toml.Unmarshal([]byte(raw), config); err != nil {
- return nil, err
+// ParseConfig parses the provided TOML-formatted configuration string and either returns the
+// parsed configuration or an error.
+func ParseConfig(raw string) (Config, error) {
+ config := Config{}
+ if err := toml.Unmarshal([]byte(raw), &config); err != nil {
+ return Config{}, err
}
if config.PrometheusListenAddr == "" {
- return nil, fmt.Errorf("missing prometheus_listen_addr")
+ return Config{}, fmt.Errorf("missing prometheus_listen_addr")
}
if config.Sleep < 0 {
- return nil, fmt.Errorf("sleep time is less than 0")
+ return Config{}, fmt.Errorf("sleep time is less than 0")
}
if config.Sleep == 0 {
config.Sleep = 15 * 60
}
- config.SleepDuration = time.Duration(config.Sleep) * time.Second
+ config.sleepDuration = time.Duration(config.Sleep) * time.Second
if len(config.Probes) == 0 {
- return nil, fmt.Errorf("must define at least one probe")
+ return Config{}, fmt.Errorf("must define at least one probe")
}
for _, probe := range config.Probes {
if len(probe.Name) == 0 {
- return nil, fmt.Errorf("all probes must have a 'name' attribute")
+ return Config{}, fmt.Errorf("all probes must have a 'name' attribute")
}
parsedURL, err := url.Parse(probe.URL)
if err != nil {
- return nil, err
+ return Config{}, err
}
if s := parsedURL.Scheme; s != "http" && s != "https" {
- return nil, fmt.Errorf("unsupported probe URL scheme: %v", probe.URL)
+ return Config{}, fmt.Errorf("unsupported probe URL scheme: %v", probe.URL)
}
}
diff --git a/internal/blackbox/config_test.go b/internal/blackbox/config_test.go
index fb41a0f25..5aa364ade 100644
--- a/internal/blackbox/config_test.go
+++ b/internal/blackbox/config_test.go
@@ -57,7 +57,7 @@ url = 'http://foo/bar'
cfg, err := ParseConfig(tc.in + validConfig)
require.NoError(t, err, "parse config")
- require.Equal(t, tc.out, cfg.SleepDuration, "parsed sleep time")
+ require.Equal(t, tc.out, cfg.sleepDuration, "parsed sleep time")
})
}
}
diff --git a/internal/blackbox/prometheus.go b/internal/blackbox/prometheus.go
deleted file mode 100644
index 7929ad5e4..000000000
--- a/internal/blackbox/prometheus.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package blackbox
-
-import (
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
-)
-
-var (
- getFirstPacket = newGauge("get_first_packet_seconds", "Time to first Git packet in GET /info/refs response")
- getTotalTime = newGauge("get_total_time_seconds", "Time to receive entire GET /info/refs response")
- getAdvertisedRefs = newGauge("get_advertised_refs", "Number of Git refs advertised in GET /info/refs")
- wantedRefs = newGauge("wanted_refs", "Number of Git refs selected for (fake) Git clone (branches + tags)")
- postTotalTime = newGauge("post_total_time_seconds", "Time to receive entire POST /upload-pack response")
- postFirstProgressPacket = newGauge("post_first_progress_packet_seconds", "Time to first progress band Git packet in POST /upload-pack response")
- postFirstPackPacket = newGauge("post_first_pack_packet_seconds", "Time to first pack band Git packet in POST /upload-pack response")
- postPackBytes = newGauge("post_pack_bytes", "Number of pack band bytes in POST /upload-pack response")
-)
-
-func newGauge(name string, help string) *prometheus.GaugeVec {
- return promauto.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly_blackbox",
- Subsystem: "git_http",
- Name: name,
- Help: help,
- },
- []string{"probe"},
- )
-}
diff --git a/internal/git/stats/analyzehttp.go b/internal/git/stats/analyzehttp.go
deleted file mode 100644
index a4ed522b0..000000000
--- a/internal/git/stats/analyzehttp.go
+++ /dev/null
@@ -1,358 +0,0 @@
-package stats
-
-import (
- "bytes"
- "compress/gzip"
- "context"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "os"
- "strings"
- "time"
-
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
-)
-
-type Clone struct {
- URL string
- Interactive bool
- User string
- Password string
-
- wants []string // all branch and tag pointers
- Get
- Post
-}
-
-func (cl *Clone) RefsWanted() int { return len(cl.wants) }
-
-// Perform does a Git HTTP clone, discarding cloned data to /dev/null.
-func (cl *Clone) Perform(ctx context.Context) error {
- if err := cl.doGet(ctx); err != nil {
- return ctxErr(ctx, err)
- }
-
- if err := cl.doPost(ctx); err != nil {
- return ctxErr(ctx, err)
- }
-
- return nil
-}
-
-func ctxErr(ctx context.Context, err error) error {
- if ctx.Err() != nil {
- return ctx.Err()
- }
- return err
-}
-
-type Get struct {
- start time.Time
- responseHeader time.Duration
- httpStatus int
- ReferenceDiscovery
-}
-
-func (g *Get) ResponseHeader() time.Duration { return g.responseHeader }
-func (g *Get) HTTPStatus() int { return g.httpStatus }
-func (g *Get) FirstGitPacket() time.Duration { return g.FirstPacket.Sub(g.start) }
-func (g *Get) ResponseBody() time.Duration { return g.LastPacket.Sub(g.start) }
-
-func (cl *Clone) doGet(ctx context.Context) error {
- req, err := http.NewRequest("GET", cl.URL+"/info/refs?service=git-upload-pack", nil)
- if err != nil {
- return err
- }
-
- req = req.WithContext(ctx)
- if cl.User != "" {
- req.SetBasicAuth(cl.User, cl.Password)
- }
-
- for k, v := range map[string]string{
- "User-Agent": "gitaly-debug",
- "Accept": "*/*",
- "Accept-Encoding": "deflate, gzip",
- "Pragma": "no-cache",
- } {
- req.Header.Set(k, v)
- }
-
- cl.Get.start = time.Now()
- cl.printInteractive("---")
- cl.printInteractive("--- GET %v", req.URL)
- cl.printInteractive("---")
-
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return err
- }
- defer func() {
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
- }()
-
- if code := resp.StatusCode; code < 200 || code >= 400 {
- return fmt.Errorf("git http get: unexpected http status: %d", code)
- }
-
- cl.Get.responseHeader = time.Since(cl.Get.start)
- cl.Get.httpStatus = resp.StatusCode
- cl.printInteractive("response code: %d", resp.StatusCode)
- cl.printInteractive("response header: %v", resp.Header)
-
- body := resp.Body
- if resp.Header.Get("Content-Encoding") == "gzip" {
- body, err = gzip.NewReader(body)
- if err != nil {
- return err
- }
- }
-
- if err := cl.Get.Parse(body); err != nil {
- return err
- }
-
- for _, ref := range cl.Get.Refs {
- if strings.HasPrefix(ref.Name, "refs/heads/") || strings.HasPrefix(ref.Name, "refs/tags/") {
- cl.wants = append(cl.wants, ref.Oid)
- }
- }
-
- return nil
-}
-
-type Post struct {
- start time.Time
- responseHeader time.Duration
- httpStatus int
- nak time.Duration
- multiband map[string]*bandInfo
- responseBody time.Duration
- packets int
- largestPacketSize int
-}
-
-func (p *Post) ResponseHeader() time.Duration { return p.responseHeader }
-func (p *Post) HTTPStatus() int { return p.httpStatus }
-func (p *Post) NAK() time.Duration { return p.nak }
-func (p *Post) ResponseBody() time.Duration { return p.responseBody }
-func (p *Post) Packets() int { return p.packets }
-func (p *Post) LargestPacketSize() int { return p.largestPacketSize }
-
-func (p *Post) BandPackets(b string) int { return p.multiband[b].packets }
-func (p *Post) BandPayloadSize(b string) int64 { return p.multiband[b].size }
-func (p *Post) BandFirstPacket(b string) time.Duration { return p.multiband[b].firstPacket }
-
-type bandInfo struct {
- firstPacket time.Duration
- size int64
- packets int
-}
-
-func (bi *bandInfo) consume(start time.Time, data []byte) {
- if bi.packets == 0 {
- bi.firstPacket = time.Since(start)
- }
- bi.size += int64(len(data))
- bi.packets++
-}
-
-// See
-// https://github.com/git/git/blob/v2.25.0/Documentation/technical/http-protocol.txt#L351
-// for background information.
-func (cl *Clone) buildPost(ctx context.Context) (*http.Request, error) {
- reqBodyRaw := &bytes.Buffer{}
- reqBodyGzip := gzip.NewWriter(reqBodyRaw)
- for i, oid := range cl.wants {
- if i == 0 {
- oid += " multi_ack_detailed no-done side-band-64k thin-pack ofs-delta deepen-since deepen-not agent=git/2.21.0"
- }
- if _, err := pktline.WriteString(reqBodyGzip, "want "+oid+"\n"); err != nil {
- return nil, err
- }
- }
- if err := pktline.WriteFlush(reqBodyGzip); err != nil {
- return nil, err
- }
- if _, err := pktline.WriteString(reqBodyGzip, "done\n"); err != nil {
- return nil, err
- }
- if err := reqBodyGzip.Close(); err != nil {
- return nil, err
- }
-
- req, err := http.NewRequest("POST", cl.URL+"/git-upload-pack", reqBodyRaw)
- if err != nil {
- return nil, err
- }
-
- req = req.WithContext(ctx)
- if cl.User != "" {
- req.SetBasicAuth(cl.User, cl.Password)
- }
-
- for k, v := range map[string]string{
- "User-Agent": "gitaly-debug",
- "Content-Type": "application/x-git-upload-pack-request",
- "Accept": "application/x-git-upload-pack-result",
- "Content-Encoding": "gzip",
- } {
- req.Header.Set(k, v)
- }
-
- return req, nil
-}
-
-func (cl *Clone) doPost(ctx context.Context) error {
- req, err := cl.buildPost(ctx)
- if err != nil {
- return err
- }
-
- cl.Post.start = time.Now()
- cl.printInteractive("---")
- cl.printInteractive("--- POST %v", req.URL)
- cl.printInteractive("---")
-
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
-
- if code := resp.StatusCode; code < 200 || code >= 400 {
- return fmt.Errorf("git http post: unexpected http status: %d", code)
- }
-
- cl.Post.responseHeader = time.Since(cl.Post.start)
- cl.Post.httpStatus = resp.StatusCode
- cl.printInteractive("response code: %d", resp.StatusCode)
- cl.printInteractive("response header: %v", resp.Header)
-
- // Expected response:
- // - "NAK\n"
- // - "<side band byte><pack or progress or error data>
- // - ...
- // - FLUSH
- //
-
- cl.Post.multiband = make(map[string]*bandInfo)
- for _, band := range Bands() {
- cl.Post.multiband[band] = &bandInfo{}
- }
-
- seenFlush := false
-
- scanner := pktline.NewScanner(resp.Body)
- for ; scanner.Scan(); cl.Post.packets++ {
- if seenFlush {
- return errors.New("received extra packet after flush")
- }
-
- if n := len(scanner.Bytes()); n > cl.Post.largestPacketSize {
- cl.Post.largestPacketSize = n
- }
-
- data := pktline.Data(scanner.Bytes())
-
- if cl.Post.packets == 0 {
- // We're now looking at the first git packet sent by the server. The
- // server must conclude the ref negotiation. Because we have not sent any
- // "have" messages there is nothing to negotiate and the server should
- // send a single NAK.
- if !bytes.Equal([]byte("NAK\n"), data) {
- return fmt.Errorf("expected NAK, got %q", data)
- }
- cl.Post.nak = time.Since(cl.Post.start)
- continue
- }
-
- if pktline.IsFlush(scanner.Bytes()) {
- seenFlush = true
- continue
- }
-
- if len(data) == 0 {
- return errors.New("empty packet in PACK data")
- }
-
- band, err := bandToHuman(data[0])
- if err != nil {
- return err
- }
-
- cl.Post.multiband[band].consume(cl.Post.start, data[1:])
-
- // Print progress data as-is
- if cl.Interactive && band == bandProgress {
- if _, err := os.Stdout.Write(data[1:]); err != nil {
- return err
- }
- }
-
- if cl.Interactive && cl.Post.packets%500 == 0 && cl.Post.packets > 0 && band == bandPack {
- // Print dots to have some sort of progress meter for the user in
- // interactive mode. It's not accurate progress, but it shows that
- // something is happening.
- if _, err := fmt.Print("."); err != nil {
- return err
- }
- }
- }
-
- if cl.Interactive {
- // Trailing newline for progress dots.
- if _, err := fmt.Println(""); err != nil {
- return err
- }
- }
-
- if err := scanner.Err(); err != nil {
- return err
- }
- if !seenFlush {
- return errors.New("POST response did not end in flush")
- }
-
- cl.Post.responseBody = time.Since(cl.Post.start)
- return nil
-}
-
-func (cl *Clone) printInteractive(format string, a ...interface{}) error {
- if !cl.Interactive {
- return nil
- }
-
- if _, err := fmt.Println(fmt.Sprintf(format, a...)); err != nil {
- return err
- }
-
- return nil
-}
-
-const (
- bandPack = "pack"
- bandProgress = "progress"
- bandError = "error"
-)
-
-// Bands returns the slice of bands which git uses to transport different kinds
-// of data in a multiplexed way. See
-// https://git-scm.com/docs/protocol-capabilities/2.24.0#_side_band_side_band_64k
-// for more information about the different bands.
-func Bands() []string { return []string{bandPack, bandProgress, bandError} }
-
-func bandToHuman(b byte) (string, error) {
- bands := Bands()
-
- // Band index bytes are 1-indexed.
- if b < 1 || int(b) > len(bands) {
- return "", fmt.Errorf("invalid band index: %d", b)
- }
-
- return bands[b-1], nil
-}
diff --git a/internal/git/stats/clone.go b/internal/git/stats/clone.go
new file mode 100644
index 000000000..18f881e9a
--- /dev/null
+++ b/internal/git/stats/clone.go
@@ -0,0 +1,303 @@
+package stats
+
+import (
+ "bytes"
+ "compress/gzip"
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
+)
+
+// Clone hosts information about a typical HTTP-based clone.
+type Clone struct {
+ // ReferenceDiscovery is the reference discovery performed as part of the clone.
+ ReferenceDiscovery HTTPReferenceDiscovery
+ // FetchPack is the response to a git-fetch-pack(1) request which computes transmits the
+ // packfile.
+ FetchPack HTTPFetchPack
+}
+
+// PerformClone does a Git HTTP clone, discarding cloned data to /dev/null.
+func PerformClone(ctx context.Context, url, user, password string, interactive bool) (Clone, error) {
+ printInteractive := func(format string, a ...interface{}) {
+ if interactive {
+ // Ignore any errors returned by this given that we only use it as a
+ // debugging aid to write to stdout.
+ fmt.Printf(format, a...)
+ }
+ }
+
+ referenceDiscovery, err := performReferenceDiscovery(ctx, url, user, password, printInteractive)
+ if err != nil {
+ return Clone{}, ctxErr(ctx, err)
+ }
+
+ fetchPack, err := performFetchPack(ctx, url, user, password,
+ referenceDiscovery.Refs(), printInteractive)
+ if err != nil {
+ return Clone{}, ctxErr(ctx, err)
+ }
+
+ return Clone{
+ ReferenceDiscovery: referenceDiscovery,
+ FetchPack: fetchPack,
+ }, nil
+}
+
+func ctxErr(ctx context.Context, err error) error {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ return err
+}
+
+// HTTPReferenceDiscovery is a ReferenceDiscovery obtained via a clone of a target repository via
+// HTTP. It contains additional information about the cloning process like status codes and
+// timings.
+type HTTPReferenceDiscovery struct {
+ start time.Time
+ responseHeader time.Duration
+ httpStatus int
+ stats ReferenceDiscovery
+}
+
+// ResponseHeader returns how long it took to receive the response header.
+func (d HTTPReferenceDiscovery) ResponseHeader() time.Duration { return d.responseHeader }
+
+// HTTPStatus returns the HTTP status code.
+func (d HTTPReferenceDiscovery) HTTPStatus() int { return d.httpStatus }
+
+// FirstGitPacket returns how long it took to receive the first Git packet.
+func (d HTTPReferenceDiscovery) FirstGitPacket() time.Duration {
+ return d.stats.FirstPacket.Sub(d.start)
+}
+
+// ResponseBody returns how long it took to receive the first bytes of the response body.
+func (d HTTPReferenceDiscovery) ResponseBody() time.Duration {
+ return d.stats.LastPacket.Sub(d.start)
+}
+
+// Refs returns all announced references.
+func (d HTTPReferenceDiscovery) Refs() []Reference { return d.stats.Refs }
+
+// Packets returns the number of Git packets received.
+func (d HTTPReferenceDiscovery) Packets() int { return d.stats.Packets }
+
+// PayloadSize returns the total size of all pktlines' data.
+func (d HTTPReferenceDiscovery) PayloadSize() int64 { return d.stats.PayloadSize }
+
+// Caps returns all announced capabilities.
+func (d HTTPReferenceDiscovery) Caps() []string { return d.stats.Caps }
+
+func performReferenceDiscovery(
+ ctx context.Context,
+ url, user, password string,
+ reportProgress func(string, ...interface{}),
+) (HTTPReferenceDiscovery, error) {
+ var referenceDiscovery HTTPReferenceDiscovery
+
+ req, err := http.NewRequest("GET", url+"/info/refs?service=git-upload-pack", nil)
+ if err != nil {
+ return HTTPReferenceDiscovery{}, err
+ }
+
+ req = req.WithContext(ctx)
+ if user != "" {
+ req.SetBasicAuth(user, password)
+ }
+
+ for k, v := range map[string]string{
+ "User-Agent": "gitaly-debug",
+ "Accept": "*/*",
+ "Accept-Encoding": "deflate, gzip",
+ "Pragma": "no-cache",
+ } {
+ req.Header.Set(k, v)
+ }
+
+ referenceDiscovery.start = time.Now()
+ reportProgress("---\n")
+ reportProgress("--- GET %v\n", req.URL)
+ reportProgress("---\n")
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return HTTPReferenceDiscovery{}, err
+ }
+ defer func() {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ }()
+
+ if code := resp.StatusCode; code < 200 || code >= 400 {
+ return HTTPReferenceDiscovery{}, fmt.Errorf("git http get: unexpected http status: %d", code)
+ }
+
+ referenceDiscovery.responseHeader = time.Since(referenceDiscovery.start)
+ referenceDiscovery.httpStatus = resp.StatusCode
+ reportProgress("response code: %d\n", resp.StatusCode)
+ reportProgress("response header: %v\n", resp.Header)
+
+ body := resp.Body
+ if resp.Header.Get("Content-Encoding") == "gzip" {
+ body, err = gzip.NewReader(body)
+ if err != nil {
+ return HTTPReferenceDiscovery{}, err
+ }
+ }
+
+ if err := referenceDiscovery.stats.Parse(body); err != nil {
+ return HTTPReferenceDiscovery{}, err
+ }
+
+ return referenceDiscovery, nil
+}
+
+// HTTPFetchPack is a FetchPack obtained via a clone of a target repository via HTTP. It contains
+// additional information about the cloning process like status codes and timings.
+type HTTPFetchPack struct {
+ start time.Time
+ responseHeader time.Duration
+ httpStatus int
+ stats FetchPack
+ wantedRefs []string
+}
+
+// ResponseHeader returns how long it took to receive the response header.
+func (p *HTTPFetchPack) ResponseHeader() time.Duration { return p.responseHeader }
+
+// HTTPStatus returns the HTTP status code.
+func (p *HTTPFetchPack) HTTPStatus() int { return p.httpStatus }
+
+// NAK returns how long it took to receive the NAK which signals that negotiation has concluded.
+func (p *HTTPFetchPack) NAK() time.Duration { return p.stats.nak.Sub(p.start) }
+
+// ResponseBody returns how long it took to receive the first bytes of the response body.
+func (p *HTTPFetchPack) ResponseBody() time.Duration { return p.stats.responseBody.Sub(p.start) }
+
+// Packets returns the number of Git packets received.
+func (p *HTTPFetchPack) Packets() int { return p.stats.packets }
+
+// LargestPacketSize returns the largest packet size received.
+func (p *HTTPFetchPack) LargestPacketSize() int { return p.stats.largestPacketSize }
+
+// RefsWanted returns the number of references sent to the remote repository as "want"s.
+func (p *HTTPFetchPack) RefsWanted() int { return len(p.wantedRefs) }
+
+// BandPackets returns how many packets were received on a specific sideband.
+func (p *HTTPFetchPack) BandPackets(b string) int { return p.stats.multiband[b].packets }
+
+// BandPayloadSize returns how many bytes were received on a specific sideband.
+func (p *HTTPFetchPack) BandPayloadSize(b string) int64 { return p.stats.multiband[b].size }
+
+// BandFirstPacket returns how long it took to receive the first packet on a specific sideband.
+func (p *HTTPFetchPack) BandFirstPacket(b string) time.Duration {
+ return p.stats.multiband[b].firstPacket.Sub(p.start)
+}
+
+// See https://github.com/git/git/blob/v2.25.0/Documentation/technical/http-protocol.txt#L351
+// for background information.
+func buildFetchPackRequest(
+ ctx context.Context,
+ url, user, password string,
+ announcedRefs []Reference,
+) (*http.Request, []string, error) {
+ var wants []string
+ for _, ref := range announcedRefs {
+ if strings.HasPrefix(ref.Name, "refs/heads/") || strings.HasPrefix(ref.Name, "refs/tags/") {
+ wants = append(wants, ref.Oid)
+ }
+ }
+
+ reqBodyRaw := &bytes.Buffer{}
+ reqBodyGzip := gzip.NewWriter(reqBodyRaw)
+ for i, oid := range wants {
+ if i == 0 {
+ oid += " multi_ack_detailed no-done side-band-64k thin-pack ofs-delta deepen-since deepen-not agent=git/2.21.0"
+ }
+ if _, err := pktline.WriteString(reqBodyGzip, "want "+oid+"\n"); err != nil {
+ return nil, nil, err
+ }
+ }
+ if err := pktline.WriteFlush(reqBodyGzip); err != nil {
+ return nil, nil, err
+ }
+ if _, err := pktline.WriteString(reqBodyGzip, "done\n"); err != nil {
+ return nil, nil, err
+ }
+ if err := reqBodyGzip.Close(); err != nil {
+ return nil, nil, err
+ }
+
+ req, err := http.NewRequest("POST", url+"/git-upload-pack", reqBodyRaw)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ req = req.WithContext(ctx)
+ if user != "" {
+ req.SetBasicAuth(user, password)
+ }
+
+ for k, v := range map[string]string{
+ "User-Agent": "gitaly-debug",
+ "Content-Type": "application/x-git-upload-pack-request",
+ "Accept": "application/x-git-upload-pack-result",
+ "Content-Encoding": "gzip",
+ } {
+ req.Header.Set(k, v)
+ }
+
+ return req, wants, nil
+}
+
+func performFetchPack(
+ ctx context.Context,
+ url, user, password string,
+ announcedRefs []Reference,
+ reportProgress func(string, ...interface{}),
+) (HTTPFetchPack, error) {
+ var fetchPack HTTPFetchPack
+
+ req, wants, err := buildFetchPackRequest(ctx, url, user, password, announcedRefs)
+ if err != nil {
+ return HTTPFetchPack{}, err
+ }
+ fetchPack.wantedRefs = wants
+
+ fetchPack.start = time.Now()
+ reportProgress("---\n")
+ reportProgress("--- POST %v\n", req.URL)
+ reportProgress("---\n")
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return HTTPFetchPack{}, err
+ }
+ defer resp.Body.Close()
+
+ if code := resp.StatusCode; code < 200 || code >= 400 {
+ return HTTPFetchPack{}, fmt.Errorf("git http post: unexpected http status: %d", code)
+ }
+
+ fetchPack.responseHeader = time.Since(fetchPack.start)
+ fetchPack.httpStatus = resp.StatusCode
+ reportProgress("response code: %d\n", resp.StatusCode)
+ reportProgress("response header: %v\n", resp.Header)
+
+ fetchPack.stats.ReportProgress = func(b []byte) { reportProgress("%s", string(b)) }
+
+ if err := fetchPack.stats.Parse(resp.Body); err != nil {
+ return HTTPFetchPack{}, err
+ }
+
+ reportProgress("\n")
+
+ return fetchPack, nil
+}
diff --git a/internal/git/stats/analyzehttp_test.go b/internal/git/stats/clone_test.go
index 9c645e398..4e25cea93 100644
--- a/internal/git/stats/analyzehttp_test.go
+++ b/internal/git/stats/clone_test.go
@@ -24,49 +24,49 @@ func TestClone(t *testing.T) {
require.NoError(t, stopGitServer())
}()
- clone := Clone{URL: fmt.Sprintf("http://localhost:%d/%s", serverPort, filepath.Base(repoPath))}
- require.NoError(t, clone.Perform(ctx), "perform analysis clone")
+ clone, err := PerformClone(ctx, fmt.Sprintf("http://localhost:%d/%s", serverPort, filepath.Base(repoPath)), "", "", false)
+ require.NoError(t, err, "perform analysis clone")
- const expectedWants = 90 // based on contents of _support/gitlab-test.git-packed-refs
- require.Greater(t, clone.RefsWanted(), expectedWants, "number of wanted refs")
+ const expectedRequests = 90 // based on contents of _support/gitlab-test.git-packed-refs
+ require.Greater(t, clone.FetchPack.RefsWanted(), expectedRequests, "number of wanted refs")
- require.Equal(t, 200, clone.Get.HTTPStatus(), "get status")
- require.Greater(t, clone.Get.Packets, 0, "number of get packets")
- require.Greater(t, clone.Get.PayloadSize, int64(0), "get payload size")
- require.Greater(t, len(clone.Get.Caps), 10, "get capabilities")
+ require.Equal(t, 200, clone.ReferenceDiscovery.HTTPStatus(), "get status")
+ require.Greater(t, clone.ReferenceDiscovery.Packets(), 0, "number of get packets")
+ require.Greater(t, clone.ReferenceDiscovery.PayloadSize(), int64(0), "get payload size")
+ require.Greater(t, len(clone.ReferenceDiscovery.Caps()), 10, "get capabilities")
previousValue := time.Duration(0)
for _, m := range []struct {
desc string
value time.Duration
}{
- {"time to receive response header", clone.Get.ResponseHeader()},
- {"time to first packet", clone.Get.FirstGitPacket()},
- {"time to receive response body", clone.Get.ResponseBody()},
+ {"time to receive response header", clone.ReferenceDiscovery.ResponseHeader()},
+ {"time to first packet", clone.ReferenceDiscovery.FirstGitPacket()},
+ {"time to receive response body", clone.ReferenceDiscovery.ResponseBody()},
} {
require.True(t, m.value > previousValue, "get: expect %s (%v) to be greater than previous value %v", m.desc, m.value, previousValue)
previousValue = m.value
}
- require.Equal(t, 200, clone.Post.HTTPStatus(), "post status")
- require.Greater(t, clone.Post.Packets(), 0, "number of post packets")
+ require.Equal(t, 200, clone.FetchPack.HTTPStatus(), "post status")
+ require.Greater(t, clone.FetchPack.Packets(), 0, "number of post packets")
- require.Greater(t, clone.Post.BandPackets("progress"), 0, "number of progress packets")
- require.Greater(t, clone.Post.BandPackets("pack"), 0, "number of pack packets")
+ require.Greater(t, clone.FetchPack.BandPackets("progress"), 0, "number of progress packets")
+ require.Greater(t, clone.FetchPack.BandPackets("pack"), 0, "number of pack packets")
- require.Greater(t, clone.Post.BandPayloadSize("progress"), int64(0), "progress payload bytes")
- require.Greater(t, clone.Post.BandPayloadSize("pack"), int64(0), "pack payload bytes")
+ require.Greater(t, clone.FetchPack.BandPayloadSize("progress"), int64(0), "progress payload bytes")
+ require.Greater(t, clone.FetchPack.BandPayloadSize("pack"), int64(0), "pack payload bytes")
previousValue = time.Duration(0)
for _, m := range []struct {
desc string
value time.Duration
}{
- {"time to receive response header", clone.Post.ResponseHeader()},
- {"time to receive NAK", clone.Post.NAK()},
- {"time to receive first progress message", clone.Post.BandFirstPacket("progress")},
- {"time to receive first pack message", clone.Post.BandFirstPacket("pack")},
- {"time to receive response body", clone.Post.ResponseBody()},
+ {"time to receive response header", clone.FetchPack.ResponseHeader()},
+ {"time to receive NAK", clone.FetchPack.NAK()},
+ {"time to receive first progress message", clone.FetchPack.BandFirstPacket("progress")},
+ {"time to receive first pack message", clone.FetchPack.BandFirstPacket("pack")},
+ {"time to receive response body", clone.FetchPack.ResponseBody()},
} {
require.True(t, m.value > previousValue, "post: expect %s (%v) to be greater than previous value %v", m.desc, m.value, previousValue)
previousValue = m.value
@@ -100,12 +100,14 @@ func TestCloneWithAuth(t *testing.T) {
require.NoError(t, stopGitServer())
}()
- clone := Clone{
- URL: fmt.Sprintf("http://localhost:%d/%s", serverPort, filepath.Base(repoPath)),
- User: user,
- Password: password,
- }
- require.NoError(t, clone.Perform(ctx), "perform analysis clone")
+ _, err := PerformClone(
+ ctx,
+ fmt.Sprintf("http://localhost:%d/%s", serverPort, filepath.Base(repoPath)),
+ user,
+ password,
+ false,
+ )
+ require.NoError(t, err, "perform analysis clone")
require.True(t, authWasChecked, "authentication middleware should have gotten triggered")
}
diff --git a/internal/git/stats/fetch_pack.go b/internal/git/stats/fetch_pack.go
new file mode 100644
index 000000000..45ab298e9
--- /dev/null
+++ b/internal/git/stats/fetch_pack.go
@@ -0,0 +1,133 @@
+package stats
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
+)
+
+// FetchPack is used to parse the response body of a git-fetch-pack(1) request.
+type FetchPack struct {
+ // ReportProgress is an optional callback set by the caller. If set, this function
+ // will be called for all received progress packets.
+ ReportProgress func([]byte)
+
+ packets int
+ largestPacketSize int
+ multiband map[string]*bandInfo
+ nak time.Time
+ responseBody time.Time
+}
+
+// Parse parses the server response from a git-fetch-pack(1) request.
+func (f *FetchPack) Parse(body io.Reader) error {
+ // Expected response:
+ // - "NAK\n"
+ // - "<side band byte><pack or progress or error data>
+ // - ...
+ // - FLUSH
+
+ f.multiband = make(map[string]*bandInfo)
+ for _, band := range Bands() {
+ f.multiband[band] = &bandInfo{}
+ }
+
+ seenFlush := false
+
+ scanner := pktline.NewScanner(body)
+ for ; scanner.Scan(); f.packets++ {
+ if seenFlush {
+ return errors.New("received extra packet after flush")
+ }
+
+ if n := len(scanner.Bytes()); n > f.largestPacketSize {
+ f.largestPacketSize = n
+ }
+
+ data := pktline.Data(scanner.Bytes())
+
+ if f.packets == 0 {
+ // We're now looking at the first git packet sent by the server. The
+ // server must conclude the ref negotiation. Because we have not sent any
+ // "have" messages there is nothing to negotiate and the server should
+ // send a single NAK.
+ if !bytes.Equal([]byte("NAK\n"), data) {
+ return fmt.Errorf("expected NAK, got %q", data)
+ }
+ f.nak = time.Now()
+ continue
+ }
+
+ if pktline.IsFlush(scanner.Bytes()) {
+ seenFlush = true
+ continue
+ }
+
+ if len(data) == 0 {
+ return errors.New("empty packet in PACK data")
+ }
+
+ band, err := bandToHuman(data[0])
+ if err != nil {
+ return err
+ }
+
+ f.multiband[band].consume(data[1:])
+
+ // Print progress data as-is.
+ if f.ReportProgress != nil && band == bandProgress {
+ f.ReportProgress(data[1:])
+ }
+ }
+
+ if err := scanner.Err(); err != nil {
+ return err
+ }
+ if !seenFlush {
+ return errors.New("POST response did not end in flush")
+ }
+
+ f.responseBody = time.Now()
+ return nil
+}
+
+type bandInfo struct {
+ firstPacket time.Time
+ size int64
+ packets int
+}
+
+func (bi *bandInfo) consume(data []byte) {
+ if bi.packets == 0 {
+ bi.firstPacket = time.Now()
+ }
+ bi.size += int64(len(data))
+ bi.packets++
+}
+
+const (
+ bandPack = "pack"
+ bandProgress = "progress"
+ bandError = "error"
+)
+
+// Bands returns the slice of bands which git uses to transport different kinds
+// of data in a multiplexed way. See
+// https://git-scm.com/docs/protocol-capabilities/2.24.0#_side_band_side_band_64k
+// for more information about the different bands.
+func Bands() []string { return []string{bandPack, bandProgress, bandError} }
+
+func bandToHuman(b byte) (string, error) {
+ bands := Bands()
+
+ // Band index bytes are 1-indexed.
+ if b < 1 || int(b) > len(bands) {
+ return "", fmt.Errorf("invalid band index: %d", b)
+ }
+
+ return bands[b-1], nil
+}
diff --git a/internal/gitaly/service/smarthttp/inforefs_test.go b/internal/gitaly/service/smarthttp/inforefs_test.go
index f945526b8..cc336bec2 100644
--- a/internal/gitaly/service/smarthttp/inforefs_test.go
+++ b/internal/gitaly/service/smarthttp/inforefs_test.go
@@ -60,7 +60,7 @@ func TestSuccessfulInfoRefsUploadWithPartialClone(t *testing.T) {
partialResponse, err := makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, request)
require.NoError(t, err)
- partialRefs := stats.Get{}
+ partialRefs := stats.ReferenceDiscovery{}
err = partialRefs.Parse(bytes.NewReader(partialResponse))
require.NoError(t, err)