diff options
Diffstat (limited to 'internal/gitaly/service/smarthttp/upload_pack.go')
-rw-r--r-- | internal/gitaly/service/smarthttp/upload_pack.go | 58 |
1 files changed, 37 insertions, 21 deletions
diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go index 1d77d5ca8..5b5ad5171 100644 --- a/internal/gitaly/service/smarthttp/upload_pack.go +++ b/internal/gitaly/service/smarthttp/upload_pack.go @@ -1,6 +1,7 @@ package smarthttp import ( + "context" "crypto/sha1" "fmt" "io" @@ -36,22 +37,8 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS return resp.GetData(), err }), h) - pr, pw := io.Pipe() - defer pw.Close() - stdin := io.TeeReader(stdinReader, pw) - statsCh := make(chan stats.PackfileNegotiation, 1) - go func() { - defer close(statsCh) - - stats, err := stats.ParsePackfileNegotiation(pr) - if err != nil { - ctxlogrus.Extract(stream.Context()).WithError(err).Debug("failed parsing packfile negotiation") - return - } - stats.UpdateMetrics(s.packfileNegotiationMetrics) - - statsCh <- stats - }() + stdin, collector := s.runStatsCollector(stream.Context(), stdinReader) + defer collector.finish() var respBytes int64 @@ -96,8 +83,7 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS } if err := cmd.Wait(); err != nil { - pw.Close() // ensure PackfileNegotiation parser returns - stats := <-statsCh + stats := collector.finish() if _, ok := command.ExitStatus(err); ok && stats.Deepen != "" { // We have seen a 'deepen' message in the request. It is expected that @@ -109,9 +95,6 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS return status.Errorf(codes.Unavailable, "PostUploadPack: %v", err) } - pw.Close() // Ensure PackfileNegotiation parser returns - <-statsCh // Wait for the packfile negotiation parser to finish. - ctxlogrus.Extract(ctx).WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).Info("request details") return nil @@ -124,3 +107,36 @@ func validateUploadPackRequest(req *gitalypb.PostUploadPackRequest) error { return nil } + +type statsCollector struct { + c io.Closer + statsCh chan stats.PackfileNegotiation +} + +func (sc *statsCollector) finish() stats.PackfileNegotiation { + sc.c.Close() + return <-sc.statsCh +} + +func (s *server) runStatsCollector(ctx context.Context, r io.Reader) (io.Reader, *statsCollector) { + pr, pw := io.Pipe() + sc := &statsCollector{ + c: pw, + statsCh: make(chan stats.PackfileNegotiation, 1), + } + + go func() { + defer close(sc.statsCh) + + stats, err := stats.ParsePackfileNegotiation(pr) + if err != nil { + ctxlogrus.Extract(ctx).WithError(err).Debug("failed parsing packfile negotiation") + return + } + stats.UpdateMetrics(s.packfileNegotiationMetrics) + + sc.statsCh <- stats + }() + + return io.TeeReader(r, pw), sc +} |