Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-pages.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladimir Shushlin <vshushlin@gitlab.com>2020-09-22 16:37:33 +0300
committerVladimir Shushlin <vshushlin@gitlab.com>2020-09-22 16:37:33 +0300
commit7802bb75e8edafe05855fcbdb72aeea7bb906ae7 (patch)
tree5a322bb467f3fad0e3bd9ccc8d35e3cd64395a09
parent78a56d61856a6d499225dfabdfb3ff1d27eeb8d0 (diff)
parent1b018b63c65eb6ef54a39db1f0fa9194c45c4bae (diff)
Merge branch '443-pass-ctx-to-ranged-reader' into 'master'
Add ctx to httprange.Reader See merge request gitlab-org/gitlab-pages!356
-rw-r--r--internal/httprange/http_ranged_reader.go12
-rw-r--r--internal/httprange/http_ranged_reader_test.go39
-rw-r--r--internal/httprange/http_reader.go10
-rw-r--r--internal/httprange/http_reader_test.go4
-rw-r--r--internal/vfs/zip/archive.go4
5 files changed, 55 insertions, 14 deletions
diff --git a/internal/httprange/http_ranged_reader.go b/internal/httprange/http_ranged_reader.go
index d023521d..babff1aa 100644
--- a/internal/httprange/http_ranged_reader.go
+++ b/internal/httprange/http_ranged_reader.go
@@ -1,6 +1,7 @@
package httprange
import (
+ "context"
"io"
)
@@ -21,15 +22,16 @@ func (rr *RangedReader) cachedRead(buf []byte, off int64) (int, error) {
}
func (rr *RangedReader) ephemeralRead(buf []byte, offset int64) (n int, err error) {
- reader := NewReader(rr.Resource, offset, int64(len(buf)))
+ // we can use context.Background and rely on the Reader's httpClient timeout for ephemeral reads
+ reader := NewReader(context.Background(), rr.Resource, offset, int64(len(buf)))
defer reader.Close()
return io.ReadFull(reader, buf)
}
// SectionReader partitions a resource from `offset` with a specified `size`
-func (rr *RangedReader) SectionReader(offset, size int64) *Reader {
- return NewReader(rr.Resource, offset, size)
+func (rr *RangedReader) SectionReader(ctx context.Context, offset, size int64) *Reader {
+ return NewReader(ctx, rr.Resource, offset, size)
}
// ReadAt reads from cachedReader if exists, otherwise fetches a new Resource first.
@@ -44,8 +46,8 @@ func (rr *RangedReader) ReadAt(buf []byte, offset int64) (n int, err error) {
// WithCachedReader creates a Reader and saves it to the RangedReader instance.
// It takes a readFunc that will Seek the contents from Reader.
-func (rr *RangedReader) WithCachedReader(readFunc func()) {
- rr.cachedReader = NewReader(rr.Resource, 0, rr.Resource.Size)
+func (rr *RangedReader) WithCachedReader(ctx context.Context, readFunc func()) {
+ rr.cachedReader = NewReader(ctx, rr.Resource, 0, rr.Resource.Size)
defer func() {
rr.cachedReader.Close()
diff --git a/internal/httprange/http_ranged_reader_test.go b/internal/httprange/http_ranged_reader_test.go
index 72e645db..b17d06b1 100644
--- a/internal/httprange/http_ranged_reader_test.go
+++ b/internal/httprange/http_ranged_reader_test.go
@@ -86,7 +86,7 @@ func TestSectionReader(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
rr := NewRangedReader(resource)
- s := rr.SectionReader(int64(tt.sectionOffset), int64(tt.sectionSize))
+ s := rr.SectionReader(context.Background(), int64(tt.sectionOffset), int64(tt.sectionSize))
defer s.Close()
buf := make([]byte, tt.readSize)
@@ -185,7 +185,7 @@ func TestReadAt(t *testing.T) {
}
t.Run(name, func(t *testing.T) {
- rr.WithCachedReader(func() {
+ rr.WithCachedReader(context.Background(), func() {
t.Run("cachedReader", testFn(rr))
})
@@ -232,7 +232,7 @@ func TestReadAtMultipart(t *testing.T) {
// cachedReader should not make extra requests, the expectedCounter should always be the same
counter = 1
t.Run("cachedReader", func(t *testing.T) {
- rr.WithCachedReader(func() {
+ rr.WithCachedReader(context.Background(), func() {
// "1234567890"
assertReadAtFunc(t, bufLen, 0, testData[:bufLen], 2)
// "abcdefghij"
@@ -243,6 +243,39 @@ func TestReadAtMultipart(t *testing.T) {
})
}
+func TestReadContextCanceled(t *testing.T) {
+ testServer := newTestServer(t, nil)
+ defer testServer.Close()
+
+ resource, err := NewResource(context.Background(), testServer.URL+"/resource")
+ require.NoError(t, err)
+
+ rr := NewRangedReader(resource)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ t.Run("section_reader", func(t *testing.T) {
+ s := rr.SectionReader(ctx, 0, resource.Size)
+
+ buf := make([]byte, resource.Size)
+ n, err := s.Read(buf)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "context canceled")
+ require.Zero(t, n)
+ })
+
+ t.Run("cached_reader", func(t *testing.T) {
+ rr.WithCachedReader(ctx, func() {
+ buf := make([]byte, resource.Size)
+ n, err := rr.ReadAt(buf, int64(0))
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "context canceled")
+ require.Zero(t, n)
+ })
+ })
+}
+
func newTestServer(t *testing.T, do func()) *httptest.Server {
t.Helper()
diff --git a/internal/httprange/http_reader.go b/internal/httprange/http_reader.go
index 4e7db6bc..17e92f65 100644
--- a/internal/httprange/http_reader.go
+++ b/internal/httprange/http_reader.go
@@ -1,6 +1,7 @@
package httprange
import (
+ "context"
"errors"
"fmt"
"io"
@@ -31,6 +32,9 @@ var (
// Reader holds a Resource and specifies ranges to read from at a time.
// Implements the io.Reader, io.Seeker and io.Closer interfaces.
type Reader struct {
+ // ctx for read requests
+ ctx context.Context
+ // Resource to read from
Resource *Resource
// res defines a current response serving data
res *http.Response
@@ -98,6 +102,8 @@ func (r *Reader) prepareRequest() (*http.Request, error) {
return nil, err
}
+ req = req.WithContext(r.ctx)
+
if r.Resource.ETag != "" {
req.Header.Set("ETag", r.Resource.ETag)
} else if r.Resource.LastModified != "" {
@@ -199,6 +205,6 @@ func (r *Reader) Close() error {
}
// NewReader creates a Reader object on a given resource for a given range
-func NewReader(resource *Resource, offset, size int64) *Reader {
- return &Reader{Resource: resource, rangeStart: offset, rangeSize: size, offset: offset}
+func NewReader(ctx context.Context, resource *Resource, offset, size int64) *Reader {
+ return &Reader{ctx: ctx, Resource: resource, rangeStart: offset, rangeSize: size, offset: offset}
}
diff --git a/internal/httprange/http_reader_test.go b/internal/httprange/http_reader_test.go
index 507a7fe8..5e971575 100644
--- a/internal/httprange/http_reader_test.go
+++ b/internal/httprange/http_reader_test.go
@@ -175,7 +175,7 @@ func TestSeekAndRead(t *testing.T) {
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
- r := NewReader(resource, tt.readerOffset, resource.Size-tt.readerOffset)
+ r := NewReader(context.Background(), resource, tt.readerOffset, resource.Size-tt.readerOffset)
_, err := r.Seek(tt.seekOffset, tt.seekWhence)
if tt.expectedSeekErrMsg != "" {
@@ -233,7 +233,7 @@ func TestReaderSetResponse(t *testing.T) {
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
- r := NewReader(&Resource{ETag: tt.prevETag}, tt.offset, 0)
+ r := NewReader(context.Background(), &Resource{ETag: tt.prevETag}, tt.offset, 0)
res := &http.Response{StatusCode: tt.status, Header: map[string][]string{}}
res.Header.Set("ETag", tt.resEtag)
diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go
index a9265b08..ca9b778f 100644
--- a/internal/vfs/zip/archive.go
+++ b/internal/vfs/zip/archive.go
@@ -101,7 +101,7 @@ func (a *zipArchive) readArchive() {
// load all archive files into memory using a cached ranged reader
a.reader = httprange.NewRangedReader(a.resource)
- a.reader.WithCachedReader(func() {
+ a.reader.WithCachedReader(ctx, func() {
a.archive, a.err = zip.NewReader(a.reader, a.resource.Size)
})
@@ -149,7 +149,7 @@ func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) {
}
// only read from dataOffset up to the size of the compressed file
- reader := a.reader.SectionReader(dataOffset, int64(file.CompressedSize64))
+ reader := a.reader.SectionReader(ctx, dataOffset, int64(file.CompressedSize64))
switch file.Method {
case zip.Deflate: