diff options
author | Christian Couder <chriscool@tuxfamily.org> | 2021-04-06 22:24:48 +0300 |
---|---|---|
committer | Christian Couder <chriscool@tuxfamily.org> | 2021-04-06 22:24:48 +0300 |
commit | a799599594b1ffb7225c775c16b9005702c0f1a3 (patch) | |
tree | 3fdf42ba1715f72cca1601f063bedead6bc1cab4 | |
parent | 8e62ebd1ba44273e5571b7d8f1088109e99d70f7 (diff) | |
parent | 81d6b873b7eae737bd4374df39eb52c546e912e0 (diff) |
Merge branch 'jv-streamcache-pipe-close' into 'master'
Close streamcache writer on all return paths
See merge request gitlab-org/gitaly!3335
-rw-r--r-- | changelogs/unreleased/jv-streamcache-pipe-close.yml | 5 | ||||
-rw-r--r-- | internal/streamcache/pipe.go | 4 | ||||
-rw-r--r-- | internal/streamcache/pipe_test.go | 65 |
3 files changed, 55 insertions, 19 deletions
diff --git a/changelogs/unreleased/jv-streamcache-pipe-close.yml b/changelogs/unreleased/jv-streamcache-pipe-close.yml new file mode 100644 index 000000000..518259014 --- /dev/null +++ b/changelogs/unreleased/jv-streamcache-pipe-close.yml @@ -0,0 +1,5 @@ +--- +title: Close streamcache writer on all return paths +merge_request: 3335 +author: +type: fixed diff --git a/internal/streamcache/pipe.go b/internal/streamcache/pipe.go index 6d61c3b57..131f21cbf 100644 --- a/internal/streamcache/pipe.go +++ b/internal/streamcache/pipe.go @@ -124,6 +124,8 @@ func (p *pipe) Close() error { p.m.Lock() defer p.m.Unlock() + errClose := p.w.Close() + if p.writerClosed() { return errWriterAlreadyClosed } @@ -136,7 +138,7 @@ func (p *pipe) Close() error { p.rcursor.Unsubscribe(p.wnotifier) p.writerClosedFirst = true - return p.w.Close() + return errClose } func (p *pipe) initialReadersClosed() bool { diff --git a/internal/streamcache/pipe_test.go b/internal/streamcache/pipe_test.go index 1f0ccac8b..e4e56214f 100644 --- a/internal/streamcache/pipe_test.go +++ b/internal/streamcache/pipe_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/require" ) -func createPipe(t *testing.T) (io.ReadCloser, *pipe, func()) { +func createPipe(t *testing.T) (io.ReadCloser, *pipe) { t.Helper() f, err := ioutil.TempFile("", "gitaly-streamcache-test") @@ -22,10 +22,12 @@ func createPipe(t *testing.T) (io.ReadCloser, *pipe, func()) { pr, p, err := newPipe(f) require.NoError(t, err) - return pr, p, func() { + t.Cleanup(func() { _ = p.RemoveFile() p.Close() - } + }) + + return pr, p } func writeBytes(w io.WriteCloser, buf []byte, progress *int64) error { @@ -45,8 +47,7 @@ func writeBytes(w io.WriteCloser, buf []byte, progress *int64) error { } func TestPipe(t *testing.T) { - pr, p, clean := createPipe(t) - defer clean() + pr, p := createPipe(t) readers := []io.ReadCloser{pr} defer func() { @@ -89,8 +90,7 @@ func TestPipe(t *testing.T) { } func TestPipe_readAfterClose(t *testing.T) { - pr1, p, clean := createPipe(t) - defer clean() + pr1, p := createPipe(t) defer pr1.Close() defer p.Close() @@ -115,8 +115,7 @@ func TestPipe_readAfterClose(t *testing.T) { } func TestPipe_backpressure(t *testing.T) { - pr, p, clean := createPipe(t) - defer clean() + pr, p := createPipe(t) defer p.Close() defer pr.Close() @@ -150,8 +149,7 @@ func TestPipe_backpressure(t *testing.T) { } func TestPipe_closeWhenAllReadersLeave(t *testing.T) { - pr1, p, clean := createPipe(t) - defer clean() + pr1, p := createPipe(t) defer p.Close() defer pr1.Close() @@ -189,30 +187,62 @@ func TestPipe_closeWhenAllReadersLeave(t *testing.T) { require.Error(t, <-werr, "writer should see error if all readers close before writer is done") } +type closeSpy struct { + namedWriteCloser + closed bool +} + +func (cs *closeSpy) Close() error { + cs.closed = true + return cs.namedWriteCloser.Close() +} + // Closing the last reader _before_ closing the writer is a failure // condition. After this happens, opening new readers should fail. func TestPipe_closeWrongOrder(t *testing.T) { - pr, p, clean := createPipe(t) - defer clean() + f, err := ioutil.TempFile("", "gitaly-streamcache-test") + require.NoError(t, err) + cs := &closeSpy{namedWriteCloser: f} + + pr, p, err := newPipe(cs) + require.NoError(t, err) + + defer func() { + _ = p.RemoveFile() + p.Close() + }() + defer p.Close() defer pr.Close() require.NoError(t, pr.Close(), "close last reader") require.Equal(t, errWrongCloseOrder, p.Close(), "closing writer should fail if all readers went away") + require.True(t, cs.closed) - _, err := p.OpenReader() + _, err = p.OpenReader() require.Equal(t, errWrongCloseOrder, err, "opening reader after 'broken close' should fail") } // Closing last reader after closing the writer is the happy path. After // this happens, opening new readers should work. func TestPipe_closeOrderHappy(t *testing.T) { - pr1, p, clean := createPipe(t) - defer clean() + f, err := ioutil.TempFile("", "gitaly-streamcache-test") + require.NoError(t, err) + cs := &closeSpy{namedWriteCloser: f} + + pr1, p, err := newPipe(cs) + require.NoError(t, err) + + defer func() { + _ = p.RemoveFile() + p.Close() + }() + defer p.Close() defer pr1.Close() require.NoError(t, p.Close()) + require.True(t, cs.closed) out1, err := ioutil.ReadAll(pr1) require.NoError(t, err) @@ -228,8 +258,7 @@ func TestPipe_closeOrderHappy(t *testing.T) { } func TestPipe_concurrency(t *testing.T) { - pr, p, clean := createPipe(t) - defer clean() + pr, p := createPipe(t) defer p.Close() defer pr.Close() |