Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-08-05 18:24:35 +0300
committerJohn Cai <jcai@gitlab.com>2022-08-05 18:24:35 +0300
commitf6b8999896ca00ee4b764bf083a3d232a269f6c7 (patch)
tree0b99d9880b68a9b28957022af6551c19c708527e
parentc9c3a8b33af988bc027bea0a4e88637faa745741 (diff)
parent133d3faa9d56b430741cabf93da781e28c20a36d (diff)
Merge branch 'renovate/github.com-hashicorp-yamux-0.x' into 'master'
go: Update module github.com/hashicorp/yamux to v0.1.1 See merge request gitlab-org/gitaly!4749
-rw-r--r--NOTICE212
-rw-r--r--go.mod2
-rw-r--r--go.sum4
3 files changed, 176 insertions, 42 deletions
diff --git a/NOTICE b/NOTICE
index 5b3f91ef2..f6b450f8b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -13991,6 +13991,7 @@ package yamux
import (
"bufio"
+ "bytes"
"fmt"
"io"
"io/ioutil"
@@ -14052,23 +14053,26 @@ type Session struct {
// sendCh is used to mark a stream as ready to send,
// or to send a header out directly.
- sendCh chan sendReady
+ sendCh chan *sendReady
// recvDoneCh is closed when recv() exits to avoid a race
// between stream registration and stream shutdown
recvDoneCh chan struct{}
+ sendDoneCh chan struct{}
// shutdown is used to safely close a session
- shutdown bool
- shutdownErr error
- shutdownCh chan struct{}
- shutdownLock sync.Mutex
+ shutdown bool
+ shutdownErr error
+ shutdownCh chan struct{}
+ shutdownLock sync.Mutex
+ shutdownErrLock sync.Mutex
}
// sendReady is used to either mark a stream as ready
// or to directly send a header
type sendReady struct {
Hdr []byte
+ mu sync.Mutex // Protects Body from unsafe reads.
Body []byte
Err chan error
}
@@ -14090,8 +14094,9 @@ func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
inflight: make(map[uint32]struct{}),
synCh: make(chan struct{}, config.AcceptBacklog),
acceptCh: make(chan *Stream, config.AcceptBacklog),
- sendCh: make(chan sendReady, 64),
+ sendCh: make(chan *sendReady, 64),
recvDoneCh: make(chan struct{}),
+ sendDoneCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
}
if client {
@@ -14244,10 +14249,15 @@ func (s *Session) Close() error {
return nil
}
s.shutdown = true
+
+ s.shutdownErrLock.Lock()
if s.shutdownErr == nil {
s.shutdownErr = ErrSessionShutdown
}
+ s.shutdownErrLock.Unlock()
+
close(s.shutdownCh)
+
s.conn.Close()
<-s.recvDoneCh
@@ -14256,17 +14266,18 @@ func (s *Session) Close() error {
for _, stream := range s.streams {
stream.forceClose()
}
+ <-s.sendDoneCh
return nil
}
// exitErr is used to handle an error that is causing the
// session to terminate.
func (s *Session) exitErr(err error) {
- s.shutdownLock.Lock()
+ s.shutdownErrLock.Lock()
if s.shutdownErr == nil {
s.shutdownErr = err
}
- s.shutdownLock.Unlock()
+ s.shutdownErrLock.Unlock()
s.Close()
}
@@ -14362,7 +14373,7 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro
timerPool.Put(t)
}()
- ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
+ ready := &sendReady{Hdr: hdr, Body: body, Err: errCh}
select {
case s.sendCh <- ready:
case <-s.shutdownCh:
@@ -14371,12 +14382,34 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro
return ErrConnectionWriteTimeout
}
+ bodyCopy := func() {
+ if body == nil {
+ return // A nil body is ignored.
+ }
+
+ // In the event of session shutdown or connection write timeout,
+ // we need to prevent `send` from reading the body buffer after
+ // returning from this function since the caller may re-use the
+ // underlying array.
+ ready.mu.Lock()
+ defer ready.mu.Unlock()
+
+ if ready.Body == nil {
+ return // Body was already copied in `send`.
+ }
+ newBody := make([]byte, len(body))
+ copy(newBody, body)
+ ready.Body = newBody
+ }
+
select {
case err := <-errCh:
return err
case <-s.shutdownCh:
+ bodyCopy()
return ErrSessionShutdown
case <-timer.C:
+ bodyCopy()
return ErrConnectionWriteTimeout
}
}
@@ -14398,7 +14431,7 @@ func (s *Session) sendNoWait(hdr header) error {
}()
select {
- case s.sendCh <- sendReady{Hdr: hdr}:
+ case s.sendCh <- &sendReady{Hdr: hdr}:
return nil
case <-s.shutdownCh:
return ErrSessionShutdown
@@ -14409,39 +14442,59 @@ func (s *Session) sendNoWait(hdr header) error {
// send is a long running goroutine that sends data
func (s *Session) send() {
+ if err := s.sendLoop(); err != nil {
+ s.exitErr(err)
+ }
+}
+
+func (s *Session) sendLoop() error {
+ defer close(s.sendDoneCh)
+ var bodyBuf bytes.Buffer
for {
+ bodyBuf.Reset()
+
select {
case ready := <-s.sendCh:
// Send a header if ready
if ready.Hdr != nil {
- sent := 0
- for sent < len(ready.Hdr) {
- n, err := s.conn.Write(ready.Hdr[sent:])
- if err != nil {
- s.logger.Printf("[ERR] yamux: Failed to write header: %v", err)
- asyncSendErr(ready.Err, err)
- s.exitErr(err)
- return
- }
- sent += n
+ _, err := s.conn.Write(ready.Hdr)
+ if err != nil {
+ s.logger.Printf("[ERR] yamux: Failed to write header: %v", err)
+ asyncSendErr(ready.Err, err)
+ return err
}
}
- // Send data from a body if given
+ ready.mu.Lock()
if ready.Body != nil {
- _, err := s.conn.Write(ready.Body)
+ // Copy the body into the buffer to avoid
+ // holding a mutex lock during the write.
+ _, err := bodyBuf.Write(ready.Body)
+ if err != nil {
+ ready.Body = nil
+ ready.mu.Unlock()
+ s.logger.Printf("[ERR] yamux: Failed to copy body into buffer: %v", err)
+ asyncSendErr(ready.Err, err)
+ return err
+ }
+ ready.Body = nil
+ }
+ ready.mu.Unlock()
+
+ if bodyBuf.Len() > 0 {
+ // Send data from a body if given
+ _, err := s.conn.Write(bodyBuf.Bytes())
if err != nil {
s.logger.Printf("[ERR] yamux: Failed to write body: %v", err)
asyncSendErr(ready.Err, err)
- s.exitErr(err)
- return
+ return err
}
}
// No error, successful send
asyncSendErr(ready.Err, nil)
case <-s.shutdownCh:
- return
+ return nil
}
}
}
@@ -14628,8 +14681,9 @@ func (s *Session) incomingStream(id uint32) error {
// Backlog exceeded! RST the stream
s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
delete(s.streams, id)
- stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0)
- return s.sendNoWait(stream.sendHdr)
+ hdr := header(make([]byte, headerSize))
+ hdr.encode(typeWindowUpdate, flagRST, id, 0)
+ return s.sendNoWait(hdr)
}
}
@@ -15133,16 +15187,16 @@ func TestSendData_Small(t *testing.T) {
}()
select {
case <-doneCh:
+ if client.NumStreams() != 0 {
+ t.Fatalf("bad")
+ }
+ if server.NumStreams() != 0 {
+ t.Fatalf("bad")
+ }
+ return
case <-time.After(time.Second):
panic("timeout")
}
-
- if client.NumStreams() != 0 {
- t.Fatalf("bad")
- }
- if server.NumStreams() != 0 {
- t.Fatalf("bad")
- }
}
func TestSendData_Large(t *testing.T) {
@@ -15213,7 +15267,6 @@ func TestSendData_Large(t *testing.T) {
t.Fatalf("err: %v", err)
}
}()
-
doneCh := make(chan struct{})
go func() {
wg.Wait()
@@ -15221,6 +15274,7 @@ func TestSendData_Large(t *testing.T) {
}()
select {
case <-doneCh:
+ return
case <-time.After(5 * time.Second):
panic("timeout")
}
@@ -15445,6 +15499,62 @@ func TestHalfClose(t *testing.T) {
}
}
+func TestHalfCloseSessionShutdown(t *testing.T) {
+ client, server := testClientServer()
+ defer client.Close()
+ defer server.Close()
+
+ // dataSize must be large enough to ensure the server will send a window
+ // update
+ dataSize := int64(server.config.MaxStreamWindowSize)
+
+ data := make([]byte, dataSize)
+ for idx := range data {
+ data[idx] = byte(idx % 256)
+ }
+
+ stream, err := client.Open()
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if _, err = stream.Write(data); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ stream2, err := server.Accept()
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ if err := stream.Close(); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ // Shut down the session of the sending side. This should not cause reads
+ // to fail on the receiving side.
+ if err := client.Close(); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ buf := make([]byte, dataSize)
+ n, err := stream2.Read(buf)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if int64(n) != dataSize {
+ t.Fatalf("bad: %v", n)
+ }
+
+ // EOF after close
+ n, err = stream2.Read(buf)
+ if err != io.EOF {
+ t.Fatalf("err: %v", err)
+ }
+ if n != 0 {
+ t.Fatalf("bad: %v", n)
+ }
+}
+
func TestReadDeadline(t *testing.T) {
client, server := testClientServer()
defer client.Close()
@@ -15717,6 +15827,8 @@ func TestKeepAlive_Timeout(t *testing.T) {
t.Fatalf("timeout waiting for timeout")
}
+ clientConn.writeBlocker.Unlock()
+
if !server.IsClosed() {
t.Fatalf("server should have closed")
}
@@ -15913,6 +16025,7 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
conn := client.conn.(*pipeConn)
conn.writeBlocker.Lock()
+ defer conn.writeBlocker.Unlock()
_, err = stream.Read(make([]byte, flood))
if err != ErrConnectionWriteTimeout {
@@ -16008,6 +16121,7 @@ func TestSession_sendNoWait_Timeout(t *testing.T) {
conn := client.conn.(*pipeConn)
conn.writeBlocker.Lock()
+ defer conn.writeBlocker.Unlock()
hdr := header(make([]byte, headerSize))
hdr.encode(typePing, flagACK, 0, 0)
@@ -16128,6 +16242,7 @@ func TestSession_ConnectionWriteTimeout(t *testing.T) {
conn := client.conn.(*pipeConn)
conn.writeBlocker.Lock()
+ defer conn.writeBlocker.Unlock()
// Since the write goroutine is blocked then this will return a
// timeout since it can't get feedback about whether the write
@@ -16293,6 +16408,7 @@ package yamux
import (
"bytes"
+ "errors"
"io"
"sync"
"sync/atomic"
@@ -16418,6 +16534,9 @@ START:
// Send a window update potentially
err = s.sendWindowUpdate()
+ if err == ErrSessionShutdown {
+ err = nil
+ }
return n, err
WAIT:
@@ -16491,6 +16610,10 @@ START:
// Send the header
s.sendHdr.encode(typeData, flags, s.id, max)
if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
+ if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) {
+ // Message left in ready queue, header re-use is unsafe.
+ s.sendHdr = header(make([]byte, headerSize))
+ }
return 0, err
}
@@ -16564,6 +16687,10 @@ func (s *Stream) sendWindowUpdate() error {
// Send the header
s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
+ if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) {
+ // Message left in ready queue, header re-use is unsafe.
+ s.controlHdr = header(make([]byte, headerSize))
+ }
return err
}
return nil
@@ -16578,6 +16705,10 @@ func (s *Stream) sendClose() error {
flags |= flagFIN
s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0)
if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
+ if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) {
+ // Message left in ready queue, header re-use is unsafe.
+ s.controlHdr = header(make([]byte, headerSize))
+ }
return err
}
return nil
@@ -16653,8 +16784,9 @@ func (s *Stream) closeTimeout() {
// Send a RST so the remote side closes too.
s.sendLock.Lock()
defer s.sendLock.Unlock()
- s.sendHdr.encode(typeWindowUpdate, flagRST, s.id, 0)
- s.session.sendNoWait(s.sendHdr)
+ hdr := header(make([]byte, headerSize))
+ hdr.encode(typeWindowUpdate, flagRST, s.id, 0)
+ s.session.sendNoWait(hdr)
}
// forceClose is used for when the session is exiting
@@ -16756,6 +16888,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
if length > s.recvWindow {
s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length)
+ s.recvLock.Unlock()
return ErrRecvWindowExceeded
}
@@ -16764,14 +16897,15 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
// This way we can read in the whole packet without further allocations.
s.recvBuf = bytes.NewBuffer(make([]byte, 0, length))
}
- if _, err := io.Copy(s.recvBuf, conn); err != nil {
+ copiedLength, err := io.Copy(s.recvBuf, conn)
+ if err != nil {
s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err)
s.recvLock.Unlock()
return err
}
// Decrement the receive window
- s.recvWindow -= length
+ s.recvWindow -= uint32(copiedLength)
s.recvLock.Unlock()
// Unblock any readers
diff --git a/go.mod b/go.mod
index 07c5c9e0c..3d3f489cf 100644
--- a/go.mod
+++ b/go.mod
@@ -15,7 +15,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.4
- github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87
+ github.com/hashicorp/yamux v0.1.1
github.com/jackc/pgconn v1.12.1
github.com/jackc/pgtype v1.11.0
github.com/jackc/pgx/v4 v4.16.1
diff --git a/go.sum b/go.sum
index 28a2e118c..93b2fd0b1 100644
--- a/go.sum
+++ b/go.sum
@@ -648,8 +648,8 @@ github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0m
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
-github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 h1:xixZ2bWeofWV68J+x6AzmKuVM/JWCQwkWm6GW/MUR6I=
-github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
+github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE=
+github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
github.com/hhatto/gorst v0.0.0-20181029133204-ca9f730cac5b h1:Jdu2tbAxkRouSILp2EbposIb8h4gO+2QuZEn3d9sKAc=
github.com/hhatto/gorst v0.0.0-20181029133204-ca9f730cac5b/go.mod h1:HmaZGXHdSwQh1jnUlBGN2BeEYOHACLVGzYOXCbsLvxY=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=