diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2020-12-17 14:59:07 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2020-12-17 14:59:07 +0300 |
commit | 8b573c94895dc0ac0e1d9d59cf3e8745e8b539ca (patch) | |
tree | 544930fb309b30317ae9797a9683768705d664c4 /workhorse/internal/httprs | |
parent | 4b1de649d0168371549608993deac953eb692019 (diff) |
Add latest changes from gitlab-org/gitlab@13-7-stable-eev13.7.0-rc42
Diffstat (limited to 'workhorse/internal/httprs')
-rw-r--r-- | workhorse/internal/httprs/LICENSE | 19 | ||||
-rw-r--r-- | workhorse/internal/httprs/README.md | 2 | ||||
-rw-r--r-- | workhorse/internal/httprs/httprs.go | 217 | ||||
-rw-r--r-- | workhorse/internal/httprs/httprs_test.go | 257 |
4 files changed, 495 insertions, 0 deletions
diff --git a/workhorse/internal/httprs/LICENSE b/workhorse/internal/httprs/LICENSE new file mode 100644 index 00000000000..58b9dd5ced1 --- /dev/null +++ b/workhorse/internal/httprs/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2015 Jean-François Bustarret + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE.
\ No newline at end of file diff --git a/workhorse/internal/httprs/README.md b/workhorse/internal/httprs/README.md new file mode 100644 index 00000000000..4f42489ab73 --- /dev/null +++ b/workhorse/internal/httprs/README.md @@ -0,0 +1,2 @@ +This directory contains a vendored copy of https://github.com/jfbus/httprs at commit SHA +b0af8319bb15446bbf29715477f841a49330a1e7. diff --git a/workhorse/internal/httprs/httprs.go b/workhorse/internal/httprs/httprs.go new file mode 100644 index 00000000000..a38230c1968 --- /dev/null +++ b/workhorse/internal/httprs/httprs.go @@ -0,0 +1,217 @@ +/* +Package httprs provides a ReadSeeker for http.Response.Body. + +Usage : + + resp, err := http.Get(url) + rs := httprs.NewHttpReadSeeker(resp) + defer rs.Close() + io.ReadFull(rs, buf) // reads the first bytes from the response body + rs.Seek(1024, 0) // moves the position, but does no range request + io.ReadFull(rs, buf) // does a range request and reads from the response body + +If you want use a specific http.Client for additional range requests : + rs := httprs.NewHttpReadSeeker(resp, client) +*/ +package httprs + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + + "github.com/mitchellh/copystructure" +) + +const shortSeekBytes = 1024 + +// A HttpReadSeeker reads from a http.Response.Body. It can Seek +// by doing range requests. +type HttpReadSeeker struct { + c *http.Client + req *http.Request + res *http.Response + ctx context.Context + r io.ReadCloser + pos int64 + + Requests int +} + +var _ io.ReadCloser = (*HttpReadSeeker)(nil) +var _ io.Seeker = (*HttpReadSeeker)(nil) + +var ( + // ErrNoContentLength is returned by Seek when the initial http response did not include a Content-Length header + ErrNoContentLength = errors.New("header Content-Length was not set") + // ErrRangeRequestsNotSupported is returned by Seek and Read + // when the remote server does not allow range requests (Accept-Ranges was not set) + ErrRangeRequestsNotSupported = errors.New("range requests are not supported by the remote server") + // ErrInvalidRange is returned by Read when trying to read past the end of the file + ErrInvalidRange = errors.New("invalid range") + // ErrContentHasChanged is returned by Read when the content has changed since the first request + ErrContentHasChanged = errors.New("content has changed since first request") +) + +// NewHttpReadSeeker returns a HttpReadSeeker, using the http.Response and, optionaly, the http.Client +// that needs to be used for future range requests. If no http.Client is given, http.DefaultClient will +// be used. +// +// res.Request will be reused for range requests, headers may be added/removed +func NewHttpReadSeeker(res *http.Response, client ...*http.Client) *HttpReadSeeker { + r := &HttpReadSeeker{ + req: res.Request, + ctx: res.Request.Context(), + res: res, + r: res.Body, + } + if len(client) > 0 { + r.c = client[0] + } else { + r.c = http.DefaultClient + } + return r +} + +// Clone clones the reader to enable parallel downloads of ranges +func (r *HttpReadSeeker) Clone() (*HttpReadSeeker, error) { + req, err := copystructure.Copy(r.req) + if err != nil { + return nil, err + } + return &HttpReadSeeker{ + req: req.(*http.Request), + res: r.res, + r: nil, + c: r.c, + }, nil +} + +// Read reads from the response body. It does a range request if Seek was called before. +// +// May return ErrRangeRequestsNotSupported, ErrInvalidRange or ErrContentHasChanged +func (r *HttpReadSeeker) Read(p []byte) (n int, err error) { + if r.r == nil { + err = r.rangeRequest() + } + if r.r != nil { + n, err = r.r.Read(p) + r.pos += int64(n) + } + return +} + +// ReadAt reads from the response body starting at offset off. +// +// May return ErrRangeRequestsNotSupported, ErrInvalidRange or ErrContentHasChanged +func (r *HttpReadSeeker) ReadAt(p []byte, off int64) (n int, err error) { + var nn int + + r.Seek(off, 0) + + for n < len(p) && err == nil { + nn, err = r.Read(p[n:]) + n += nn + } + return +} + +// Close closes the response body +func (r *HttpReadSeeker) Close() error { + if r.r != nil { + return r.r.Close() + } + return nil +} + +// Seek moves the reader position to a new offset. +// +// It does not send http requests, allowing for multiple seeks without overhead. +// The http request will be sent by the next Read call. +// +// May return ErrNoContentLength or ErrRangeRequestsNotSupported +func (r *HttpReadSeeker) Seek(offset int64, whence int) (int64, error) { + var err error + switch whence { + case 0: + case 1: + offset += r.pos + case 2: + if r.res.ContentLength <= 0 { + return 0, ErrNoContentLength + } + offset = r.res.ContentLength - offset + } + if r.r != nil { + // Try to read, which is cheaper than doing a request + if r.pos < offset && offset-r.pos <= shortSeekBytes { + _, err := io.CopyN(ioutil.Discard, r, offset-r.pos) + if err != nil { + return 0, err + } + } + + if r.pos != offset { + err = r.r.Close() + r.r = nil + } + } + r.pos = offset + return r.pos, err +} + +func cloneHeader(h http.Header) http.Header { + h2 := make(http.Header, len(h)) + for k, vv := range h { + vv2 := make([]string, len(vv)) + copy(vv2, vv) + h2[k] = vv2 + } + return h2 +} + +func (r *HttpReadSeeker) newRequest() *http.Request { + newreq := r.req.WithContext(r.ctx) // includes shallow copies of maps, but okay + if r.req.ContentLength == 0 { + newreq.Body = nil // Issue 16036: nil Body for http.Transport retries + } + newreq.Header = cloneHeader(r.req.Header) + return newreq +} + +func (r *HttpReadSeeker) rangeRequest() error { + r.req = r.newRequest() + r.req.Header.Set("Range", fmt.Sprintf("bytes=%d-", r.pos)) + etag, last := r.res.Header.Get("ETag"), r.res.Header.Get("Last-Modified") + switch { + case last != "": + r.req.Header.Set("If-Range", last) + case etag != "": + r.req.Header.Set("If-Range", etag) + } + + r.Requests++ + + res, err := r.c.Do(r.req) + if err != nil { + return err + } + switch res.StatusCode { + case http.StatusRequestedRangeNotSatisfiable: + return ErrInvalidRange + case http.StatusOK: + // some servers return 200 OK for bytes=0- + if r.pos > 0 || + (etag != "" && etag != res.Header.Get("ETag")) { + return ErrContentHasChanged + } + fallthrough + case http.StatusPartialContent: + r.r = res.Body + return nil + } + return ErrRangeRequestsNotSupported +} diff --git a/workhorse/internal/httprs/httprs_test.go b/workhorse/internal/httprs/httprs_test.go new file mode 100644 index 00000000000..62279d895c9 --- /dev/null +++ b/workhorse/internal/httprs/httprs_test.go @@ -0,0 +1,257 @@ +package httprs + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +type fakeResponseWriter struct { + code int + h http.Header + tmp *os.File +} + +func (f *fakeResponseWriter) Header() http.Header { + return f.h +} + +func (f *fakeResponseWriter) Write(b []byte) (int, error) { + return f.tmp.Write(b) +} + +func (f *fakeResponseWriter) Close(b []byte) error { + return f.tmp.Close() +} + +func (f *fakeResponseWriter) WriteHeader(code int) { + f.code = code +} + +func (f *fakeResponseWriter) Response() *http.Response { + f.tmp.Seek(0, io.SeekStart) + return &http.Response{Body: f.tmp, StatusCode: f.code, Header: f.h} +} + +type fakeRoundTripper struct { + src *os.File + downgradeZeroToNoRange bool +} + +func (f *fakeRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + fw := &fakeResponseWriter{h: http.Header{}} + var err error + fw.tmp, err = ioutil.TempFile(os.TempDir(), "httprs") + if err != nil { + return nil, err + } + if f.downgradeZeroToNoRange { + // There are implementations that downgrades bytes=0- to a normal un-ranged GET + if r.Header.Get("Range") == "bytes=0-" { + r.Header.Del("Range") + } + } + http.ServeContent(fw, r, "temp.txt", time.Now(), f.src) + + return fw.Response(), nil +} + +const SZ = 4096 + +const ( + downgradeZeroToNoRange = 1 << iota + sendAcceptRanges +) + +type RSFactory func() *HttpReadSeeker + +func newRSFactory(flags int) RSFactory { + return func() *HttpReadSeeker { + tmp, err := ioutil.TempFile(os.TempDir(), "httprs") + if err != nil { + return nil + } + for i := 0; i < SZ; i++ { + tmp.WriteString(fmt.Sprintf("%04d", i)) + } + + req, err := http.NewRequest("GET", "http://www.example.com", nil) + if err != nil { + return nil + } + res := &http.Response{ + Request: req, + ContentLength: SZ * 4, + } + + if flags&sendAcceptRanges > 0 { + res.Header = http.Header{"Accept-Ranges": []string{"bytes"}} + } + + downgradeZeroToNoRange := (flags & downgradeZeroToNoRange) > 0 + return NewHttpReadSeeker(res, &http.Client{Transport: &fakeRoundTripper{src: tmp, downgradeZeroToNoRange: downgradeZeroToNoRange}}) + } +} + +func TestHttpWebServer(t *testing.T) { + Convey("Scenario: testing WebServer", t, func() { + dir, err := ioutil.TempDir("", "webserver") + So(err, ShouldBeNil) + defer os.RemoveAll(dir) + + err = ioutil.WriteFile(filepath.Join(dir, "file"), make([]byte, 10000), 0755) + So(err, ShouldBeNil) + + server := httptest.NewServer(http.FileServer(http.Dir(dir))) + + Convey("When requesting /file", func() { + res, err := http.Get(server.URL + "/file") + So(err, ShouldBeNil) + + stream := NewHttpReadSeeker(res) + So(stream, ShouldNotBeNil) + + Convey("Can read 100 bytes from start of file", func() { + n, err := stream.Read(make([]byte, 100)) + So(err, ShouldBeNil) + So(n, ShouldEqual, 100) + + Convey("When seeking 4KiB forward", func() { + pos, err := stream.Seek(4096, io.SeekCurrent) + So(err, ShouldBeNil) + So(pos, ShouldEqual, 4096+100) + + Convey("Can read 100 bytes", func() { + n, err := stream.Read(make([]byte, 100)) + So(err, ShouldBeNil) + So(n, ShouldEqual, 100) + }) + }) + }) + }) + }) +} + +func TestHttpReaderSeeker(t *testing.T) { + tests := []struct { + name string + newRS func() *HttpReadSeeker + }{ + {name: "with no flags", newRS: newRSFactory(0)}, + {name: "with only Accept-Ranges", newRS: newRSFactory(sendAcceptRanges)}, + {name: "downgrade 0-range to no range", newRS: newRSFactory(downgradeZeroToNoRange)}, + {name: "downgrade 0-range with Accept-Ranges", newRS: newRSFactory(downgradeZeroToNoRange | sendAcceptRanges)}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testHttpReaderSeeker(t, test.newRS) + }) + } +} + +func testHttpReaderSeeker(t *testing.T, newRS RSFactory) { + Convey("Scenario: testing HttpReaderSeeker", t, func() { + + Convey("Read should start at the beginning", func() { + r := newRS() + So(r, ShouldNotBeNil) + defer r.Close() + buf := make([]byte, 4) + n, err := io.ReadFull(r, buf) + So(n, ShouldEqual, 4) + So(err, ShouldBeNil) + So(string(buf), ShouldEqual, "0000") + }) + + Convey("Seek w SEEK_SET should seek to right offset", func() { + r := newRS() + So(r, ShouldNotBeNil) + defer r.Close() + s, err := r.Seek(4*64, io.SeekStart) + So(s, ShouldEqual, 4*64) + So(err, ShouldBeNil) + buf := make([]byte, 4) + n, err := io.ReadFull(r, buf) + So(n, ShouldEqual, 4) + So(err, ShouldBeNil) + So(string(buf), ShouldEqual, "0064") + }) + + Convey("Read + Seek w SEEK_CUR should seek to right offset", func() { + r := newRS() + So(r, ShouldNotBeNil) + defer r.Close() + buf := make([]byte, 4) + io.ReadFull(r, buf) + s, err := r.Seek(4*64, os.SEEK_CUR) + So(s, ShouldEqual, 4*64+4) + So(err, ShouldBeNil) + n, err := io.ReadFull(r, buf) + So(n, ShouldEqual, 4) + So(err, ShouldBeNil) + So(string(buf), ShouldEqual, "0065") + }) + + Convey("Seek w SEEK_END should seek to right offset", func() { + r := newRS() + So(r, ShouldNotBeNil) + defer r.Close() + buf := make([]byte, 4) + io.ReadFull(r, buf) + s, err := r.Seek(4, os.SEEK_END) + So(s, ShouldEqual, SZ*4-4) + So(err, ShouldBeNil) + n, err := io.ReadFull(r, buf) + So(n, ShouldEqual, 4) + So(err, ShouldBeNil) + So(string(buf), ShouldEqual, fmt.Sprintf("%04d", SZ-1)) + }) + + Convey("Short seek should consume existing request", func() { + r := newRS() + So(r, ShouldNotBeNil) + defer r.Close() + buf := make([]byte, 4) + So(r.Requests, ShouldEqual, 0) + io.ReadFull(r, buf) + So(r.Requests, ShouldEqual, 1) + s, err := r.Seek(shortSeekBytes, os.SEEK_CUR) + So(r.Requests, ShouldEqual, 1) + So(s, ShouldEqual, shortSeekBytes+4) + So(err, ShouldBeNil) + n, err := io.ReadFull(r, buf) + So(n, ShouldEqual, 4) + So(err, ShouldBeNil) + So(string(buf), ShouldEqual, "0257") + So(r.Requests, ShouldEqual, 1) + }) + + Convey("Long seek should do a new request", func() { + r := newRS() + So(r, ShouldNotBeNil) + defer r.Close() + buf := make([]byte, 4) + So(r.Requests, ShouldEqual, 0) + io.ReadFull(r, buf) + So(r.Requests, ShouldEqual, 1) + s, err := r.Seek(shortSeekBytes+1, os.SEEK_CUR) + So(r.Requests, ShouldEqual, 1) + So(s, ShouldEqual, shortSeekBytes+4+1) + So(err, ShouldBeNil) + n, err := io.ReadFull(r, buf) + So(n, ShouldEqual, 4) + So(err, ShouldBeNil) + So(string(buf), ShouldEqual, "2570") + So(r.Requests, ShouldEqual, 2) + }) + }) +} |