diff options
author | Vladimir Shushlin <vshushlin@gitlab.com> | 2020-09-22 16:37:33 +0300 |
---|---|---|
committer | Vladimir Shushlin <vshushlin@gitlab.com> | 2020-09-22 16:37:33 +0300 |
commit | 7802bb75e8edafe05855fcbdb72aeea7bb906ae7 (patch) | |
tree | 5a322bb467f3fad0e3bd9ccc8d35e3cd64395a09 | |
parent | 78a56d61856a6d499225dfabdfb3ff1d27eeb8d0 (diff) | |
parent | 1b018b63c65eb6ef54a39db1f0fa9194c45c4bae (diff) |
Merge branch '443-pass-ctx-to-ranged-reader' into 'master'
Add ctx to httprange.Reader
See merge request gitlab-org/gitlab-pages!356
-rw-r--r-- | internal/httprange/http_ranged_reader.go | 12 | ||||
-rw-r--r-- | internal/httprange/http_ranged_reader_test.go | 39 | ||||
-rw-r--r-- | internal/httprange/http_reader.go | 10 | ||||
-rw-r--r-- | internal/httprange/http_reader_test.go | 4 | ||||
-rw-r--r-- | internal/vfs/zip/archive.go | 4 |
5 files changed, 55 insertions, 14 deletions
diff --git a/internal/httprange/http_ranged_reader.go b/internal/httprange/http_ranged_reader.go index d023521d..babff1aa 100644 --- a/internal/httprange/http_ranged_reader.go +++ b/internal/httprange/http_ranged_reader.go @@ -1,6 +1,7 @@ package httprange import ( + "context" "io" ) @@ -21,15 +22,16 @@ func (rr *RangedReader) cachedRead(buf []byte, off int64) (int, error) { } func (rr *RangedReader) ephemeralRead(buf []byte, offset int64) (n int, err error) { - reader := NewReader(rr.Resource, offset, int64(len(buf))) + // we can use context.Background and rely on the Reader's httpClient timeout for ephemeral reads + reader := NewReader(context.Background(), rr.Resource, offset, int64(len(buf))) defer reader.Close() return io.ReadFull(reader, buf) } // SectionReader partitions a resource from `offset` with a specified `size` -func (rr *RangedReader) SectionReader(offset, size int64) *Reader { - return NewReader(rr.Resource, offset, size) +func (rr *RangedReader) SectionReader(ctx context.Context, offset, size int64) *Reader { + return NewReader(ctx, rr.Resource, offset, size) } // ReadAt reads from cachedReader if exists, otherwise fetches a new Resource first. @@ -44,8 +46,8 @@ func (rr *RangedReader) ReadAt(buf []byte, offset int64) (n int, err error) { // WithCachedReader creates a Reader and saves it to the RangedReader instance. // It takes a readFunc that will Seek the contents from Reader. -func (rr *RangedReader) WithCachedReader(readFunc func()) { - rr.cachedReader = NewReader(rr.Resource, 0, rr.Resource.Size) +func (rr *RangedReader) WithCachedReader(ctx context.Context, readFunc func()) { + rr.cachedReader = NewReader(ctx, rr.Resource, 0, rr.Resource.Size) defer func() { rr.cachedReader.Close() diff --git a/internal/httprange/http_ranged_reader_test.go b/internal/httprange/http_ranged_reader_test.go index 72e645db..b17d06b1 100644 --- a/internal/httprange/http_ranged_reader_test.go +++ b/internal/httprange/http_ranged_reader_test.go @@ -86,7 +86,7 @@ func TestSectionReader(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { rr := NewRangedReader(resource) - s := rr.SectionReader(int64(tt.sectionOffset), int64(tt.sectionSize)) + s := rr.SectionReader(context.Background(), int64(tt.sectionOffset), int64(tt.sectionSize)) defer s.Close() buf := make([]byte, tt.readSize) @@ -185,7 +185,7 @@ func TestReadAt(t *testing.T) { } t.Run(name, func(t *testing.T) { - rr.WithCachedReader(func() { + rr.WithCachedReader(context.Background(), func() { t.Run("cachedReader", testFn(rr)) }) @@ -232,7 +232,7 @@ func TestReadAtMultipart(t *testing.T) { // cachedReader should not make extra requests, the expectedCounter should always be the same counter = 1 t.Run("cachedReader", func(t *testing.T) { - rr.WithCachedReader(func() { + rr.WithCachedReader(context.Background(), func() { // "1234567890" assertReadAtFunc(t, bufLen, 0, testData[:bufLen], 2) // "abcdefghij" @@ -243,6 +243,39 @@ func TestReadAtMultipart(t *testing.T) { }) } +func TestReadContextCanceled(t *testing.T) { + testServer := newTestServer(t, nil) + defer testServer.Close() + + resource, err := NewResource(context.Background(), testServer.URL+"/resource") + require.NoError(t, err) + + rr := NewRangedReader(resource) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + t.Run("section_reader", func(t *testing.T) { + s := rr.SectionReader(ctx, 0, resource.Size) + + buf := make([]byte, resource.Size) + n, err := s.Read(buf) + require.Error(t, err) + require.Contains(t, err.Error(), "context canceled") + require.Zero(t, n) + }) + + t.Run("cached_reader", func(t *testing.T) { + rr.WithCachedReader(ctx, func() { + buf := make([]byte, resource.Size) + n, err := rr.ReadAt(buf, int64(0)) + require.Error(t, err) + require.Contains(t, err.Error(), "context canceled") + require.Zero(t, n) + }) + }) +} + func newTestServer(t *testing.T, do func()) *httptest.Server { t.Helper() diff --git a/internal/httprange/http_reader.go b/internal/httprange/http_reader.go index 4e7db6bc..17e92f65 100644 --- a/internal/httprange/http_reader.go +++ b/internal/httprange/http_reader.go @@ -1,6 +1,7 @@ package httprange import ( + "context" "errors" "fmt" "io" @@ -31,6 +32,9 @@ var ( // Reader holds a Resource and specifies ranges to read from at a time. // Implements the io.Reader, io.Seeker and io.Closer interfaces. type Reader struct { + // ctx for read requests + ctx context.Context + // Resource to read from Resource *Resource // res defines a current response serving data res *http.Response @@ -98,6 +102,8 @@ func (r *Reader) prepareRequest() (*http.Request, error) { return nil, err } + req = req.WithContext(r.ctx) + if r.Resource.ETag != "" { req.Header.Set("ETag", r.Resource.ETag) } else if r.Resource.LastModified != "" { @@ -199,6 +205,6 @@ func (r *Reader) Close() error { } // NewReader creates a Reader object on a given resource for a given range -func NewReader(resource *Resource, offset, size int64) *Reader { - return &Reader{Resource: resource, rangeStart: offset, rangeSize: size, offset: offset} +func NewReader(ctx context.Context, resource *Resource, offset, size int64) *Reader { + return &Reader{ctx: ctx, Resource: resource, rangeStart: offset, rangeSize: size, offset: offset} } diff --git a/internal/httprange/http_reader_test.go b/internal/httprange/http_reader_test.go index 507a7fe8..5e971575 100644 --- a/internal/httprange/http_reader_test.go +++ b/internal/httprange/http_reader_test.go @@ -175,7 +175,7 @@ func TestSeekAndRead(t *testing.T) { } for name, tt := range tests { t.Run(name, func(t *testing.T) { - r := NewReader(resource, tt.readerOffset, resource.Size-tt.readerOffset) + r := NewReader(context.Background(), resource, tt.readerOffset, resource.Size-tt.readerOffset) _, err := r.Seek(tt.seekOffset, tt.seekWhence) if tt.expectedSeekErrMsg != "" { @@ -233,7 +233,7 @@ func TestReaderSetResponse(t *testing.T) { } for name, tt := range tests { t.Run(name, func(t *testing.T) { - r := NewReader(&Resource{ETag: tt.prevETag}, tt.offset, 0) + r := NewReader(context.Background(), &Resource{ETag: tt.prevETag}, tt.offset, 0) res := &http.Response{StatusCode: tt.status, Header: map[string][]string{}} res.Header.Set("ETag", tt.resEtag) diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go index a9265b08..ca9b778f 100644 --- a/internal/vfs/zip/archive.go +++ b/internal/vfs/zip/archive.go @@ -101,7 +101,7 @@ func (a *zipArchive) readArchive() { // load all archive files into memory using a cached ranged reader a.reader = httprange.NewRangedReader(a.resource) - a.reader.WithCachedReader(func() { + a.reader.WithCachedReader(ctx, func() { a.archive, a.err = zip.NewReader(a.reader, a.resource.Size) }) @@ -149,7 +149,7 @@ func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { } // only read from dataOffset up to the size of the compressed file - reader := a.reader.SectionReader(dataOffset, int64(file.CompressedSize64)) + reader := a.reader.SectionReader(ctx, dataOffset, int64(file.CompressedSize64)) switch file.Method { case zip.Deflate: |