Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-11-02 10:22:06 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-11-02 12:19:33 +0300
commite848de6246569a7d1840ded4b4a015515b608eb6 (patch)
treec88c633067a2a5edeebd26b57cf577e835671b5a
parent98fe9f19fcecf4b9d27ab60dfaf6d127447a0350 (diff)
streamio: Implement synchronized writer
Writing to gRPC streams concurrently isn't allowed, which is why we need to synchronize those writes. Doing so with the `NewWriter()` is repetitive, so this commit implements a new function `NewSyncWrite()` which receives a mutex in addition to the callback function. The mutex is then locked and unlocked previous to invoking the callback.
-rw-r--r--streamio/stream.go15
-rw-r--r--streamio/stream_test.go22
2 files changed, 37 insertions, 0 deletions
diff --git a/streamio/stream.go b/streamio/stream.go
index 3e90f0e42..85a325d84 100644
--- a/streamio/stream.go
+++ b/streamio/stream.go
@@ -8,6 +8,7 @@ import (
"io"
"os"
"strconv"
+ "sync"
"github.com/prometheus/client_golang/prometheus"
)
@@ -102,6 +103,20 @@ func NewWriter(sender func(p []byte) error) io.Writer {
return &sendWriter{sender: sender}
}
+// NewSyncWriter turns sender into an io.Writer. The sender callback will
+// receive []byte arguments of length at most WriteBufferSize. All calls to the
+// sender will be synchronized via the mutex.
+func NewSyncWriter(m *sync.Mutex, sender func(p []byte) error) io.Writer {
+ return &sendWriter{
+ sender: func(p []byte) error {
+ m.Lock()
+ defer m.Unlock()
+
+ return sender(p)
+ },
+ }
+}
+
// WriteBufferSize is the largest []byte that Write() will pass to its
// underlying send function. This value can be changed at runtime using
// the GITALY_STREAMIO_WRITE_BUFFER_SIZE environment variable.
diff --git a/streamio/stream_test.go b/streamio/stream_test.go
index e4796913d..fbc2944cf 100644
--- a/streamio/stream_test.go
+++ b/streamio/stream_test.go
@@ -6,6 +6,7 @@ import (
"io"
"io/ioutil"
"strings"
+ "sync"
"testing"
"testing/iotest"
@@ -103,6 +104,27 @@ func TestWriterChunking(t *testing.T) {
}
}
+func TestNewSyncWriter(t *testing.T) {
+ var m sync.Mutex
+ testData := "Hello this is some test data"
+ ts := &testSender{}
+
+ w := NewSyncWriter(&m, func(p []byte) error {
+ // As there is no way to check whether a mutex is locked already, we can just try to
+ // unlock it here. If the mutex wasn't locked, it would cause a runtime error. As
+ // there's no concurrent writers in this test, this is safe to do.
+ m.Unlock()
+ m.Lock()
+
+ return ts.send(p)
+ })
+
+ _, err := io.CopyBuffer(&opaqueWriter{w}, strings.NewReader(testData), make([]byte, 10))
+ require.NoError(t, err)
+
+ require.Equal(t, testData, string(bytes.Join(ts.sends, nil)))
+}
+
type testSender struct {
sends [][]byte
}