diff options
author | John Cai <jcai@gitlab.com> | 2022-08-05 18:24:35 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-08-05 18:24:35 +0300 |
commit | f6b8999896ca00ee4b764bf083a3d232a269f6c7 (patch) | |
tree | 0b99d9880b68a9b28957022af6551c19c708527e | |
parent | c9c3a8b33af988bc027bea0a4e88637faa745741 (diff) | |
parent | 133d3faa9d56b430741cabf93da781e28c20a36d (diff) |
Merge branch 'renovate/github.com-hashicorp-yamux-0.x' into 'master'
go: Update module github.com/hashicorp/yamux to v0.1.1
See merge request gitlab-org/gitaly!4749
-rw-r--r-- | NOTICE | 212 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 4 |
3 files changed, 176 insertions, 42 deletions
@@ -13991,6 +13991,7 @@ package yamux import ( "bufio" + "bytes" "fmt" "io" "io/ioutil" @@ -14052,23 +14053,26 @@ type Session struct { // sendCh is used to mark a stream as ready to send, // or to send a header out directly. - sendCh chan sendReady + sendCh chan *sendReady // recvDoneCh is closed when recv() exits to avoid a race // between stream registration and stream shutdown recvDoneCh chan struct{} + sendDoneCh chan struct{} // shutdown is used to safely close a session - shutdown bool - shutdownErr error - shutdownCh chan struct{} - shutdownLock sync.Mutex + shutdown bool + shutdownErr error + shutdownCh chan struct{} + shutdownLock sync.Mutex + shutdownErrLock sync.Mutex } // sendReady is used to either mark a stream as ready // or to directly send a header type sendReady struct { Hdr []byte + mu sync.Mutex // Protects Body from unsafe reads. Body []byte Err chan error } @@ -14090,8 +14094,9 @@ func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { inflight: make(map[uint32]struct{}), synCh: make(chan struct{}, config.AcceptBacklog), acceptCh: make(chan *Stream, config.AcceptBacklog), - sendCh: make(chan sendReady, 64), + sendCh: make(chan *sendReady, 64), recvDoneCh: make(chan struct{}), + sendDoneCh: make(chan struct{}), shutdownCh: make(chan struct{}), } if client { @@ -14244,10 +14249,15 @@ func (s *Session) Close() error { return nil } s.shutdown = true + + s.shutdownErrLock.Lock() if s.shutdownErr == nil { s.shutdownErr = ErrSessionShutdown } + s.shutdownErrLock.Unlock() + close(s.shutdownCh) + s.conn.Close() <-s.recvDoneCh @@ -14256,17 +14266,18 @@ func (s *Session) Close() error { for _, stream := range s.streams { stream.forceClose() } + <-s.sendDoneCh return nil } // exitErr is used to handle an error that is causing the // session to terminate. func (s *Session) exitErr(err error) { - s.shutdownLock.Lock() + s.shutdownErrLock.Lock() if s.shutdownErr == nil { s.shutdownErr = err } - s.shutdownLock.Unlock() + s.shutdownErrLock.Unlock() s.Close() } @@ -14362,7 +14373,7 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro timerPool.Put(t) }() - ready := sendReady{Hdr: hdr, Body: body, Err: errCh} + ready := &sendReady{Hdr: hdr, Body: body, Err: errCh} select { case s.sendCh <- ready: case <-s.shutdownCh: @@ -14371,12 +14382,34 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro return ErrConnectionWriteTimeout } + bodyCopy := func() { + if body == nil { + return // A nil body is ignored. + } + + // In the event of session shutdown or connection write timeout, + // we need to prevent `send` from reading the body buffer after + // returning from this function since the caller may re-use the + // underlying array. + ready.mu.Lock() + defer ready.mu.Unlock() + + if ready.Body == nil { + return // Body was already copied in `send`. + } + newBody := make([]byte, len(body)) + copy(newBody, body) + ready.Body = newBody + } + select { case err := <-errCh: return err case <-s.shutdownCh: + bodyCopy() return ErrSessionShutdown case <-timer.C: + bodyCopy() return ErrConnectionWriteTimeout } } @@ -14398,7 +14431,7 @@ func (s *Session) sendNoWait(hdr header) error { }() select { - case s.sendCh <- sendReady{Hdr: hdr}: + case s.sendCh <- &sendReady{Hdr: hdr}: return nil case <-s.shutdownCh: return ErrSessionShutdown @@ -14409,39 +14442,59 @@ func (s *Session) sendNoWait(hdr header) error { // send is a long running goroutine that sends data func (s *Session) send() { + if err := s.sendLoop(); err != nil { + s.exitErr(err) + } +} + +func (s *Session) sendLoop() error { + defer close(s.sendDoneCh) + var bodyBuf bytes.Buffer for { + bodyBuf.Reset() + select { case ready := <-s.sendCh: // Send a header if ready if ready.Hdr != nil { - sent := 0 - for sent < len(ready.Hdr) { - n, err := s.conn.Write(ready.Hdr[sent:]) - if err != nil { - s.logger.Printf("[ERR] yamux: Failed to write header: %v", err) - asyncSendErr(ready.Err, err) - s.exitErr(err) - return - } - sent += n + _, err := s.conn.Write(ready.Hdr) + if err != nil { + s.logger.Printf("[ERR] yamux: Failed to write header: %v", err) + asyncSendErr(ready.Err, err) + return err } } - // Send data from a body if given + ready.mu.Lock() if ready.Body != nil { - _, err := s.conn.Write(ready.Body) + // Copy the body into the buffer to avoid + // holding a mutex lock during the write. + _, err := bodyBuf.Write(ready.Body) + if err != nil { + ready.Body = nil + ready.mu.Unlock() + s.logger.Printf("[ERR] yamux: Failed to copy body into buffer: %v", err) + asyncSendErr(ready.Err, err) + return err + } + ready.Body = nil + } + ready.mu.Unlock() + + if bodyBuf.Len() > 0 { + // Send data from a body if given + _, err := s.conn.Write(bodyBuf.Bytes()) if err != nil { s.logger.Printf("[ERR] yamux: Failed to write body: %v", err) asyncSendErr(ready.Err, err) - s.exitErr(err) - return + return err } } // No error, successful send asyncSendErr(ready.Err, nil) case <-s.shutdownCh: - return + return nil } } } @@ -14628,8 +14681,9 @@ func (s *Session) incomingStream(id uint32) error { // Backlog exceeded! RST the stream s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset") delete(s.streams, id) - stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0) - return s.sendNoWait(stream.sendHdr) + hdr := header(make([]byte, headerSize)) + hdr.encode(typeWindowUpdate, flagRST, id, 0) + return s.sendNoWait(hdr) } } @@ -15133,16 +15187,16 @@ func TestSendData_Small(t *testing.T) { }() select { case <-doneCh: + if client.NumStreams() != 0 { + t.Fatalf("bad") + } + if server.NumStreams() != 0 { + t.Fatalf("bad") + } + return case <-time.After(time.Second): panic("timeout") } - - if client.NumStreams() != 0 { - t.Fatalf("bad") - } - if server.NumStreams() != 0 { - t.Fatalf("bad") - } } func TestSendData_Large(t *testing.T) { @@ -15213,7 +15267,6 @@ func TestSendData_Large(t *testing.T) { t.Fatalf("err: %v", err) } }() - doneCh := make(chan struct{}) go func() { wg.Wait() @@ -15221,6 +15274,7 @@ func TestSendData_Large(t *testing.T) { }() select { case <-doneCh: + return case <-time.After(5 * time.Second): panic("timeout") } @@ -15445,6 +15499,62 @@ func TestHalfClose(t *testing.T) { } } +func TestHalfCloseSessionShutdown(t *testing.T) { + client, server := testClientServer() + defer client.Close() + defer server.Close() + + // dataSize must be large enough to ensure the server will send a window + // update + dataSize := int64(server.config.MaxStreamWindowSize) + + data := make([]byte, dataSize) + for idx := range data { + data[idx] = byte(idx % 256) + } + + stream, err := client.Open() + if err != nil { + t.Fatalf("err: %v", err) + } + if _, err = stream.Write(data); err != nil { + t.Fatalf("err: %v", err) + } + + stream2, err := server.Accept() + if err != nil { + t.Fatalf("err: %v", err) + } + + if err := stream.Close(); err != nil { + t.Fatalf("err: %v", err) + } + + // Shut down the session of the sending side. This should not cause reads + // to fail on the receiving side. + if err := client.Close(); err != nil { + t.Fatalf("err: %v", err) + } + + buf := make([]byte, dataSize) + n, err := stream2.Read(buf) + if err != nil { + t.Fatalf("err: %v", err) + } + if int64(n) != dataSize { + t.Fatalf("bad: %v", n) + } + + // EOF after close + n, err = stream2.Read(buf) + if err != io.EOF { + t.Fatalf("err: %v", err) + } + if n != 0 { + t.Fatalf("bad: %v", n) + } +} + func TestReadDeadline(t *testing.T) { client, server := testClientServer() defer client.Close() @@ -15717,6 +15827,8 @@ func TestKeepAlive_Timeout(t *testing.T) { t.Fatalf("timeout waiting for timeout") } + clientConn.writeBlocker.Unlock() + if !server.IsClosed() { t.Fatalf("server should have closed") } @@ -15913,6 +16025,7 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) { conn := client.conn.(*pipeConn) conn.writeBlocker.Lock() + defer conn.writeBlocker.Unlock() _, err = stream.Read(make([]byte, flood)) if err != ErrConnectionWriteTimeout { @@ -16008,6 +16121,7 @@ func TestSession_sendNoWait_Timeout(t *testing.T) { conn := client.conn.(*pipeConn) conn.writeBlocker.Lock() + defer conn.writeBlocker.Unlock() hdr := header(make([]byte, headerSize)) hdr.encode(typePing, flagACK, 0, 0) @@ -16128,6 +16242,7 @@ func TestSession_ConnectionWriteTimeout(t *testing.T) { conn := client.conn.(*pipeConn) conn.writeBlocker.Lock() + defer conn.writeBlocker.Unlock() // Since the write goroutine is blocked then this will return a // timeout since it can't get feedback about whether the write @@ -16293,6 +16408,7 @@ package yamux import ( "bytes" + "errors" "io" "sync" "sync/atomic" @@ -16418,6 +16534,9 @@ START: // Send a window update potentially err = s.sendWindowUpdate() + if err == ErrSessionShutdown { + err = nil + } return n, err WAIT: @@ -16491,6 +16610,10 @@ START: // Send the header s.sendHdr.encode(typeData, flags, s.id, max) if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { + if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) { + // Message left in ready queue, header re-use is unsafe. + s.sendHdr = header(make([]byte, headerSize)) + } return 0, err } @@ -16564,6 +16687,10 @@ func (s *Stream) sendWindowUpdate() error { // Send the header s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta) if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil { + if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) { + // Message left in ready queue, header re-use is unsafe. + s.controlHdr = header(make([]byte, headerSize)) + } return err } return nil @@ -16578,6 +16705,10 @@ func (s *Stream) sendClose() error { flags |= flagFIN s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0) if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil { + if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) { + // Message left in ready queue, header re-use is unsafe. + s.controlHdr = header(make([]byte, headerSize)) + } return err } return nil @@ -16653,8 +16784,9 @@ func (s *Stream) closeTimeout() { // Send a RST so the remote side closes too. s.sendLock.Lock() defer s.sendLock.Unlock() - s.sendHdr.encode(typeWindowUpdate, flagRST, s.id, 0) - s.session.sendNoWait(s.sendHdr) + hdr := header(make([]byte, headerSize)) + hdr.encode(typeWindowUpdate, flagRST, s.id, 0) + s.session.sendNoWait(hdr) } // forceClose is used for when the session is exiting @@ -16756,6 +16888,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { if length > s.recvWindow { s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length) + s.recvLock.Unlock() return ErrRecvWindowExceeded } @@ -16764,14 +16897,15 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { // This way we can read in the whole packet without further allocations. s.recvBuf = bytes.NewBuffer(make([]byte, 0, length)) } - if _, err := io.Copy(s.recvBuf, conn); err != nil { + copiedLength, err := io.Copy(s.recvBuf, conn) + if err != nil { s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err) s.recvLock.Unlock() return err } // Decrement the receive window - s.recvWindow -= length + s.recvWindow -= uint32(copiedLength) s.recvLock.Unlock() // Unblock any readers @@ -15,7 +15,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 + github.com/hashicorp/yamux v0.1.1 github.com/jackc/pgconn v1.12.1 github.com/jackc/pgtype v1.11.0 github.com/jackc/pgx/v4 v4.16.1 @@ -648,8 +648,8 @@ github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0m github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= -github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 h1:xixZ2bWeofWV68J+x6AzmKuVM/JWCQwkWm6GW/MUR6I= -github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= +github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= +github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= github.com/hhatto/gorst v0.0.0-20181029133204-ca9f730cac5b h1:Jdu2tbAxkRouSILp2EbposIb8h4gO+2QuZEn3d9sKAc= github.com/hhatto/gorst v0.0.0-20181029133204-ca9f730cac5b/go.mod h1:HmaZGXHdSwQh1jnUlBGN2BeEYOHACLVGzYOXCbsLvxY= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= |