Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Couder <chriscool@tuxfamily.org>2021-04-06 22:24:48 +0300
committerChristian Couder <chriscool@tuxfamily.org>2021-04-06 22:24:48 +0300
commita799599594b1ffb7225c775c16b9005702c0f1a3 (patch)
tree3fdf42ba1715f72cca1601f063bedead6bc1cab4
parent8e62ebd1ba44273e5571b7d8f1088109e99d70f7 (diff)
parent81d6b873b7eae737bd4374df39eb52c546e912e0 (diff)
Merge branch 'jv-streamcache-pipe-close' into 'master'
Close streamcache writer on all return paths See merge request gitlab-org/gitaly!3335
-rw-r--r--changelogs/unreleased/jv-streamcache-pipe-close.yml5
-rw-r--r--internal/streamcache/pipe.go4
-rw-r--r--internal/streamcache/pipe_test.go65
3 files changed, 55 insertions, 19 deletions
diff --git a/changelogs/unreleased/jv-streamcache-pipe-close.yml b/changelogs/unreleased/jv-streamcache-pipe-close.yml
new file mode 100644
index 000000000..518259014
--- /dev/null
+++ b/changelogs/unreleased/jv-streamcache-pipe-close.yml
@@ -0,0 +1,5 @@
+---
+title: Close streamcache writer on all return paths
+merge_request: 3335
+author:
+type: fixed
diff --git a/internal/streamcache/pipe.go b/internal/streamcache/pipe.go
index 6d61c3b57..131f21cbf 100644
--- a/internal/streamcache/pipe.go
+++ b/internal/streamcache/pipe.go
@@ -124,6 +124,8 @@ func (p *pipe) Close() error {
p.m.Lock()
defer p.m.Unlock()
+ errClose := p.w.Close()
+
if p.writerClosed() {
return errWriterAlreadyClosed
}
@@ -136,7 +138,7 @@ func (p *pipe) Close() error {
p.rcursor.Unsubscribe(p.wnotifier)
p.writerClosedFirst = true
- return p.w.Close()
+ return errClose
}
func (p *pipe) initialReadersClosed() bool {
diff --git a/internal/streamcache/pipe_test.go b/internal/streamcache/pipe_test.go
index 1f0ccac8b..e4e56214f 100644
--- a/internal/streamcache/pipe_test.go
+++ b/internal/streamcache/pipe_test.go
@@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/require"
)
-func createPipe(t *testing.T) (io.ReadCloser, *pipe, func()) {
+func createPipe(t *testing.T) (io.ReadCloser, *pipe) {
t.Helper()
f, err := ioutil.TempFile("", "gitaly-streamcache-test")
@@ -22,10 +22,12 @@ func createPipe(t *testing.T) (io.ReadCloser, *pipe, func()) {
pr, p, err := newPipe(f)
require.NoError(t, err)
- return pr, p, func() {
+ t.Cleanup(func() {
_ = p.RemoveFile()
p.Close()
- }
+ })
+
+ return pr, p
}
func writeBytes(w io.WriteCloser, buf []byte, progress *int64) error {
@@ -45,8 +47,7 @@ func writeBytes(w io.WriteCloser, buf []byte, progress *int64) error {
}
func TestPipe(t *testing.T) {
- pr, p, clean := createPipe(t)
- defer clean()
+ pr, p := createPipe(t)
readers := []io.ReadCloser{pr}
defer func() {
@@ -89,8 +90,7 @@ func TestPipe(t *testing.T) {
}
func TestPipe_readAfterClose(t *testing.T) {
- pr1, p, clean := createPipe(t)
- defer clean()
+ pr1, p := createPipe(t)
defer pr1.Close()
defer p.Close()
@@ -115,8 +115,7 @@ func TestPipe_readAfterClose(t *testing.T) {
}
func TestPipe_backpressure(t *testing.T) {
- pr, p, clean := createPipe(t)
- defer clean()
+ pr, p := createPipe(t)
defer p.Close()
defer pr.Close()
@@ -150,8 +149,7 @@ func TestPipe_backpressure(t *testing.T) {
}
func TestPipe_closeWhenAllReadersLeave(t *testing.T) {
- pr1, p, clean := createPipe(t)
- defer clean()
+ pr1, p := createPipe(t)
defer p.Close()
defer pr1.Close()
@@ -189,30 +187,62 @@ func TestPipe_closeWhenAllReadersLeave(t *testing.T) {
require.Error(t, <-werr, "writer should see error if all readers close before writer is done")
}
+type closeSpy struct {
+ namedWriteCloser
+ closed bool
+}
+
+func (cs *closeSpy) Close() error {
+ cs.closed = true
+ return cs.namedWriteCloser.Close()
+}
+
// Closing the last reader _before_ closing the writer is a failure
// condition. After this happens, opening new readers should fail.
func TestPipe_closeWrongOrder(t *testing.T) {
- pr, p, clean := createPipe(t)
- defer clean()
+ f, err := ioutil.TempFile("", "gitaly-streamcache-test")
+ require.NoError(t, err)
+ cs := &closeSpy{namedWriteCloser: f}
+
+ pr, p, err := newPipe(cs)
+ require.NoError(t, err)
+
+ defer func() {
+ _ = p.RemoveFile()
+ p.Close()
+ }()
+
defer p.Close()
defer pr.Close()
require.NoError(t, pr.Close(), "close last reader")
require.Equal(t, errWrongCloseOrder, p.Close(), "closing writer should fail if all readers went away")
+ require.True(t, cs.closed)
- _, err := p.OpenReader()
+ _, err = p.OpenReader()
require.Equal(t, errWrongCloseOrder, err, "opening reader after 'broken close' should fail")
}
// Closing last reader after closing the writer is the happy path. After
// this happens, opening new readers should work.
func TestPipe_closeOrderHappy(t *testing.T) {
- pr1, p, clean := createPipe(t)
- defer clean()
+ f, err := ioutil.TempFile("", "gitaly-streamcache-test")
+ require.NoError(t, err)
+ cs := &closeSpy{namedWriteCloser: f}
+
+ pr1, p, err := newPipe(cs)
+ require.NoError(t, err)
+
+ defer func() {
+ _ = p.RemoveFile()
+ p.Close()
+ }()
+
defer p.Close()
defer pr1.Close()
require.NoError(t, p.Close())
+ require.True(t, cs.closed)
out1, err := ioutil.ReadAll(pr1)
require.NoError(t, err)
@@ -228,8 +258,7 @@ func TestPipe_closeOrderHappy(t *testing.T) {
}
func TestPipe_concurrency(t *testing.T) {
- pr, p, clean := createPipe(t)
- defer clean()
+ pr, p := createPipe(t)
defer p.Close()
defer pr.Close()