diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-11-02 10:22:06 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-11-02 12:19:33 +0300 |
commit | e848de6246569a7d1840ded4b4a015515b608eb6 (patch) | |
tree | c88c633067a2a5edeebd26b57cf577e835671b5a /streamio | |
parent | 98fe9f19fcecf4b9d27ab60dfaf6d127447a0350 (diff) |
streamio: Implement synchronized writer
Writing to gRPC streams concurrently isn't allowed, which is why we need
to synchronize those writes. Doing so with the `NewWriter()` is
repetitive, so this commit implements a new function `NewSyncWrite()`
which receives a mutex in addition to the callback function. The mutex
is then locked and unlocked previous to invoking the callback.
Diffstat (limited to 'streamio')
-rw-r--r-- | streamio/stream.go | 15 | ||||
-rw-r--r-- | streamio/stream_test.go | 22 |
2 files changed, 37 insertions, 0 deletions
diff --git a/streamio/stream.go b/streamio/stream.go index 3e90f0e42..85a325d84 100644 --- a/streamio/stream.go +++ b/streamio/stream.go @@ -8,6 +8,7 @@ import ( "io" "os" "strconv" + "sync" "github.com/prometheus/client_golang/prometheus" ) @@ -102,6 +103,20 @@ func NewWriter(sender func(p []byte) error) io.Writer { return &sendWriter{sender: sender} } +// NewSyncWriter turns sender into an io.Writer. The sender callback will +// receive []byte arguments of length at most WriteBufferSize. All calls to the +// sender will be synchronized via the mutex. +func NewSyncWriter(m *sync.Mutex, sender func(p []byte) error) io.Writer { + return &sendWriter{ + sender: func(p []byte) error { + m.Lock() + defer m.Unlock() + + return sender(p) + }, + } +} + // WriteBufferSize is the largest []byte that Write() will pass to its // underlying send function. This value can be changed at runtime using // the GITALY_STREAMIO_WRITE_BUFFER_SIZE environment variable. diff --git a/streamio/stream_test.go b/streamio/stream_test.go index e4796913d..fbc2944cf 100644 --- a/streamio/stream_test.go +++ b/streamio/stream_test.go @@ -6,6 +6,7 @@ import ( "io" "io/ioutil" "strings" + "sync" "testing" "testing/iotest" @@ -103,6 +104,27 @@ func TestWriterChunking(t *testing.T) { } } +func TestNewSyncWriter(t *testing.T) { + var m sync.Mutex + testData := "Hello this is some test data" + ts := &testSender{} + + w := NewSyncWriter(&m, func(p []byte) error { + // As there is no way to check whether a mutex is locked already, we can just try to + // unlock it here. If the mutex wasn't locked, it would cause a runtime error. As + // there's no concurrent writers in this test, this is safe to do. + m.Unlock() + m.Lock() + + return ts.send(p) + }) + + _, err := io.CopyBuffer(&opaqueWriter{w}, strings.NewReader(testData), make([]byte, 10)) + require.NoError(t, err) + + require.Equal(t, testData, string(bytes.Join(ts.sends, nil))) +} + type testSender struct { sends [][]byte } |