From 87ead969b1fe8a7067b906409a864fa0432096b9 Mon Sep 17 00:00:00 2001 From: GitLab Renovate Bot Date: Fri, 8 Jul 2022 15:17:41 +0000 Subject: go: Update github.com/hashicorp/yamux digest to 0bc27b2 --- NOTICE | 132 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- go.mod | 2 +- go.sum | 3 +- 3 files changed, 127 insertions(+), 10 deletions(-) diff --git a/NOTICE b/NOTICE index d963932ff..e0f130882 100644 --- a/NOTICE +++ b/NOTICE @@ -13611,6 +13611,25 @@ import ( "fmt" ) +// NetError implements net.Error +type NetError struct { + err error + timeout bool + temporary bool +} + +func (e *NetError) Error() string { + return e.err.Error() +} + +func (e *NetError) Timeout() bool { + return e.timeout +} + +func (e *NetError) Temporary() bool { + return e.temporary +} + var ( // ErrInvalidVersion means we received a frame with an // invalid version @@ -13636,7 +13655,13 @@ var ( ErrRecvWindowExceeded = fmt.Errorf("recv window exceeded") // ErrTimeout is used when we reach an IO deadline - ErrTimeout = fmt.Errorf("i/o deadline reached") + ErrTimeout = &NetError{ + err: fmt.Errorf("i/o deadline reached"), + + // Error should meet net.Error interface for timeouts for compatability + // with standard library expectations, such as http servers. + timeout: true, + } // ErrStreamClosed is returned when using a closed stream ErrStreamClosed = fmt.Errorf("stream closed") @@ -13878,6 +13903,13 @@ type Config struct { // window size that we allow for a stream. MaxStreamWindowSize uint32 + // StreamOpenTimeout is the maximum amount of time that a stream will + // be allowed to remain in pending state while waiting for an ack from the peer. + // Once the timeout is reached the session will be gracefully closed. + // A zero value disables the StreamOpenTimeout allowing unbounded + // blocking on OpenStream calls. + StreamOpenTimeout time.Duration + // StreamCloseTimeout is the maximum time that a stream will allowed to // be in a half-closed state when `Close` is called before forcibly // closing the connection. Forcibly closed connections will empty the @@ -13903,6 +13935,7 @@ func DefaultConfig() *Config { ConnectionWriteTimeout: 10 * time.Second, MaxStreamWindowSize: initialStreamWindow, StreamCloseTimeout: 5 * time.Minute, + StreamOpenTimeout: 75 * time.Second, LogOutput: os.Stderr, } } @@ -14036,7 +14069,7 @@ type Session struct { // or to directly send a header type sendReady struct { Hdr []byte - Body io.Reader + Body []byte Err chan error } @@ -14140,6 +14173,10 @@ GET_ID: s.inflight[id] = struct{}{} s.streamLock.Unlock() + if s.config.StreamOpenTimeout > 0 { + go s.setOpenTimeout(stream) + } + // Send the window update to create if err := stream.sendWindowUpdate(); err != nil { select { @@ -14152,6 +14189,27 @@ GET_ID: return stream, nil } +// setOpenTimeout implements a timeout for streams that are opened but not established. +// If the StreamOpenTimeout is exceeded we assume the peer is unable to ACK, +// and close the session. +// The number of running timers is bounded by the capacity of the synCh. +func (s *Session) setOpenTimeout(stream *Stream) { + timer := time.NewTimer(s.config.StreamOpenTimeout) + defer timer.Stop() + + select { + case <-stream.establishCh: + return + case <-s.shutdownCh: + return + case <-timer.C: + // Timeout reached while waiting for ACK. + // Close the session to force connection re-establishment. + s.logger.Printf("[ERR] yamux: aborted stream open (destination=%s): %v", s.RemoteAddr().String(), ErrTimeout.err) + s.Close() + } +} + // Accept is used to block until the next available stream // is ready to be accepted. func (s *Session) Accept() (net.Conn, error) { @@ -14283,7 +14341,7 @@ func (s *Session) keepalive() { } // waitForSendErr waits to send a header, checking for a potential shutdown -func (s *Session) waitForSend(hdr header, body io.Reader) error { +func (s *Session) waitForSend(hdr header, body []byte) error { errCh := make(chan error, 1) return s.waitForSendErr(hdr, body, errCh) } @@ -14291,7 +14349,7 @@ func (s *Session) waitForSend(hdr header, body io.Reader) error { // waitForSendErr waits to send a header with optional data, checking for a // potential shutdown. Since there's the expectation that sends can happen // in a timely manner, we enforce the connection write timeout here. -func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { +func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) error { t := timerPool.Get() timer := t.(*time.Timer) timer.Reset(s.config.ConnectionWriteTimeout) @@ -14371,7 +14429,7 @@ func (s *Session) send() { // Send data from a body if given if ready.Body != nil { - _, err := io.Copy(s.conn, ready.Body) + _, err := s.conn.Write(ready.Body) if err != nil { s.logger.Printf("[ERR] yamux: Failed to write body: %v", err) asyncSendErr(ready.Err, err) @@ -14618,6 +14676,7 @@ import ( "io" "io/ioutil" "log" + "net" "reflect" "runtime" "strings" @@ -14885,6 +14944,39 @@ func TestAccept(t *testing.T) { } } +func TestOpenStreamTimeout(t *testing.T) { + const timeout = 25 * time.Millisecond + + cfg := testConf() + cfg.StreamOpenTimeout = timeout + + client, server := testClientServerConfig(cfg) + defer client.Close() + defer server.Close() + + clientLogs := captureLogs(client) + + // Open a single stream without a server to acknowledge it. + s, err := client.OpenStream() + if err != nil { + t.Fatal(err) + } + + // Sleep for longer than the stream open timeout. + // Since no ACKs are received, the stream and session should be closed. + time.Sleep(timeout * 5) + + if !clientLogs.match([]string{"[ERR] yamux: aborted stream open (destination=yamux:remote): i/o deadline reached"}) { + t.Fatalf("server log incorect: %v", clientLogs.logs()) + } + if s.state != streamClosed { + t.Fatalf("stream should have been closed") + } + if !client.IsClosed() { + t.Fatalf("session should have been closed") + } +} + func TestClose_closeTimeout(t *testing.T) { conf := testConf() conf.StreamCloseTimeout = 10 * time.Millisecond @@ -15375,9 +15467,27 @@ func TestReadDeadline(t *testing.T) { } buf := make([]byte, 4) - if _, err := stream.Read(buf); err != ErrTimeout { + _, err = stream.Read(buf) + if err != ErrTimeout { t.Fatalf("err: %v", err) } + + // See https://github.com/hashicorp/yamux/issues/90 + // The standard library's http server package will read from connections in + // the background to detect if they are alive. + // + // It sets a read deadline on connections and detect if the returned error + // is a network timeout error which implements net.Error. + // + // The HTTP server will cancel all server requests if it isn't timeout error + // from the connection. + // + // We assert that we return an error meeting the interface to avoid + // accidently breaking yamux session compatability with the standard + // library's http server implementation. + if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() { + t.Fatalf("reading timeout error is expected to implement net.Error and return true when calling Timeout()") + } } func TestReadDeadline_BlockedRead(t *testing.T) { @@ -16231,6 +16341,9 @@ type Stream struct { readDeadline atomic.Value // time.Time writeDeadline atomic.Value // time.Time + // establishCh is notified if the stream is established or being closed. + establishCh chan struct{} + // closeTimer is set with stateLock held to honor the StreamCloseTimeout // setting on Session. closeTimer *time.Timer @@ -16251,6 +16364,7 @@ func newStream(session *Session, id uint32, state streamState) *Stream { sendWindow: initialStreamWindow, recvNotifyCh: make(chan struct{}, 1), sendNotifyCh: make(chan struct{}, 1), + establishCh: make(chan struct{}, 1), } s.readDeadline.Store(time.Time{}) s.writeDeadline.Store(time.Time{}) @@ -16346,7 +16460,7 @@ func (s *Stream) Write(b []byte) (n int, err error) { func (s *Stream) write(b []byte) (n int, err error) { var flags uint16 var max uint32 - var body io.Reader + var body []byte START: s.stateLock.Lock() switch s.state { @@ -16372,7 +16486,7 @@ START: // Send up to our send window max = min(window, uint32(len(b))) - body = bytes.NewReader(b[:max]) + body = b[:max] // Send the header s.sendHdr.encode(typeData, flags, s.id, max) @@ -16574,6 +16688,7 @@ func (s *Stream) processFlags(flags uint16) error { if s.state == streamSYNSent { s.state = streamEstablished } + asyncNotify(s.establishCh) s.session.establishStream(s.id) } if flags&flagFIN == flagFIN { @@ -16606,6 +16721,7 @@ func (s *Stream) processFlags(flags uint16) error { func (s *Stream) notifyWaiting() { asyncNotify(s.recvNotifyCh) asyncNotify(s.sendNotifyCh) + asyncNotify(s.establishCh) } // incrSendWindow updates the size of our send window diff --git a/go.mod b/go.mod index 0b7441847..17df17b6f 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864 + github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 github.com/jackc/pgconn v1.12.1 github.com/jackc/pgtype v1.11.0 github.com/jackc/pgx/v4 v4.16.1 diff --git a/go.sum b/go.sum index 4c2afd61c..fb3daca97 100644 --- a/go.sum +++ b/go.sum @@ -618,8 +618,9 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= -github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864 h1:Y4V+SFe7d3iH+9pJCoeWIOS5/xBJIFsltS7E+KJSsJY= github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= +github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 h1:xixZ2bWeofWV68J+x6AzmKuVM/JWCQwkWm6GW/MUR6I= +github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= github.com/hhatto/gorst v0.0.0-20181029133204-ca9f730cac5b h1:Jdu2tbAxkRouSILp2EbposIb8h4gO+2QuZEn3d9sKAc= github.com/hhatto/gorst v0.0.0-20181029133204-ca9f730cac5b/go.mod h1:HmaZGXHdSwQh1jnUlBGN2BeEYOHACLVGzYOXCbsLvxY= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= -- cgit v1.2.3