Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Renovate Bot <gitlab-bot@gitlab.com>2022-07-08 18:17:41 +0300
committerGitLab Renovate Bot <gitlab-bot@gitlab.com>2022-07-08 18:17:41 +0300
commit87ead969b1fe8a7067b906409a864fa0432096b9 (patch)
tree395ee584b6a67bd7e246fc7a58404dadcc211a89
parent823c15369c005fc51a4598cf98adc60d49d30dcd (diff)
go: Update github.com/hashicorp/yamux digest to 0bc27b2
-rw-r--r--NOTICE132
-rw-r--r--go.mod2
-rw-r--r--go.sum3
3 files changed, 127 insertions, 10 deletions
diff --git a/NOTICE b/NOTICE
index d963932ff..e0f130882 100644
--- a/NOTICE
+++ b/NOTICE
@@ -13611,6 +13611,25 @@ import (
"fmt"
)
+// NetError implements net.Error
+type NetError struct {
+ err error
+ timeout bool
+ temporary bool
+}
+
+func (e *NetError) Error() string {
+ return e.err.Error()
+}
+
+func (e *NetError) Timeout() bool {
+ return e.timeout
+}
+
+func (e *NetError) Temporary() bool {
+ return e.temporary
+}
+
var (
// ErrInvalidVersion means we received a frame with an
// invalid version
@@ -13636,7 +13655,13 @@ var (
ErrRecvWindowExceeded = fmt.Errorf("recv window exceeded")
// ErrTimeout is used when we reach an IO deadline
- ErrTimeout = fmt.Errorf("i/o deadline reached")
+ ErrTimeout = &NetError{
+ err: fmt.Errorf("i/o deadline reached"),
+
+ // Error should meet net.Error interface for timeouts for compatability
+ // with standard library expectations, such as http servers.
+ timeout: true,
+ }
// ErrStreamClosed is returned when using a closed stream
ErrStreamClosed = fmt.Errorf("stream closed")
@@ -13878,6 +13903,13 @@ type Config struct {
// window size that we allow for a stream.
MaxStreamWindowSize uint32
+ // StreamOpenTimeout is the maximum amount of time that a stream will
+ // be allowed to remain in pending state while waiting for an ack from the peer.
+ // Once the timeout is reached the session will be gracefully closed.
+ // A zero value disables the StreamOpenTimeout allowing unbounded
+ // blocking on OpenStream calls.
+ StreamOpenTimeout time.Duration
+
// StreamCloseTimeout is the maximum time that a stream will allowed to
// be in a half-closed state when `Close` is called before forcibly
// closing the connection. Forcibly closed connections will empty the
@@ -13903,6 +13935,7 @@ func DefaultConfig() *Config {
ConnectionWriteTimeout: 10 * time.Second,
MaxStreamWindowSize: initialStreamWindow,
StreamCloseTimeout: 5 * time.Minute,
+ StreamOpenTimeout: 75 * time.Second,
LogOutput: os.Stderr,
}
}
@@ -14036,7 +14069,7 @@ type Session struct {
// or to directly send a header
type sendReady struct {
Hdr []byte
- Body io.Reader
+ Body []byte
Err chan error
}
@@ -14140,6 +14173,10 @@ GET_ID:
s.inflight[id] = struct{}{}
s.streamLock.Unlock()
+ if s.config.StreamOpenTimeout > 0 {
+ go s.setOpenTimeout(stream)
+ }
+
// Send the window update to create
if err := stream.sendWindowUpdate(); err != nil {
select {
@@ -14152,6 +14189,27 @@ GET_ID:
return stream, nil
}
+// setOpenTimeout implements a timeout for streams that are opened but not established.
+// If the StreamOpenTimeout is exceeded we assume the peer is unable to ACK,
+// and close the session.
+// The number of running timers is bounded by the capacity of the synCh.
+func (s *Session) setOpenTimeout(stream *Stream) {
+ timer := time.NewTimer(s.config.StreamOpenTimeout)
+ defer timer.Stop()
+
+ select {
+ case <-stream.establishCh:
+ return
+ case <-s.shutdownCh:
+ return
+ case <-timer.C:
+ // Timeout reached while waiting for ACK.
+ // Close the session to force connection re-establishment.
+ s.logger.Printf("[ERR] yamux: aborted stream open (destination=%s): %v", s.RemoteAddr().String(), ErrTimeout.err)
+ s.Close()
+ }
+}
+
// Accept is used to block until the next available stream
// is ready to be accepted.
func (s *Session) Accept() (net.Conn, error) {
@@ -14283,7 +14341,7 @@ func (s *Session) keepalive() {
}
// waitForSendErr waits to send a header, checking for a potential shutdown
-func (s *Session) waitForSend(hdr header, body io.Reader) error {
+func (s *Session) waitForSend(hdr header, body []byte) error {
errCh := make(chan error, 1)
return s.waitForSendErr(hdr, body, errCh)
}
@@ -14291,7 +14349,7 @@ func (s *Session) waitForSend(hdr header, body io.Reader) error {
// waitForSendErr waits to send a header with optional data, checking for a
// potential shutdown. Since there's the expectation that sends can happen
// in a timely manner, we enforce the connection write timeout here.
-func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
+func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) error {
t := timerPool.Get()
timer := t.(*time.Timer)
timer.Reset(s.config.ConnectionWriteTimeout)
@@ -14371,7 +14429,7 @@ func (s *Session) send() {
// Send data from a body if given
if ready.Body != nil {
- _, err := io.Copy(s.conn, ready.Body)
+ _, err := s.conn.Write(ready.Body)
if err != nil {
s.logger.Printf("[ERR] yamux: Failed to write body: %v", err)
asyncSendErr(ready.Err, err)
@@ -14618,6 +14676,7 @@ import (
"io"
"io/ioutil"
"log"
+ "net"
"reflect"
"runtime"
"strings"
@@ -14885,6 +14944,39 @@ func TestAccept(t *testing.T) {
}
}
+func TestOpenStreamTimeout(t *testing.T) {
+ const timeout = 25 * time.Millisecond
+
+ cfg := testConf()
+ cfg.StreamOpenTimeout = timeout
+
+ client, server := testClientServerConfig(cfg)
+ defer client.Close()
+ defer server.Close()
+
+ clientLogs := captureLogs(client)
+
+ // Open a single stream without a server to acknowledge it.
+ s, err := client.OpenStream()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Sleep for longer than the stream open timeout.
+ // Since no ACKs are received, the stream and session should be closed.
+ time.Sleep(timeout * 5)
+
+ if !clientLogs.match([]string{"[ERR] yamux: aborted stream open (destination=yamux:remote): i/o deadline reached"}) {
+ t.Fatalf("server log incorect: %v", clientLogs.logs())
+ }
+ if s.state != streamClosed {
+ t.Fatalf("stream should have been closed")
+ }
+ if !client.IsClosed() {
+ t.Fatalf("session should have been closed")
+ }
+}
+
func TestClose_closeTimeout(t *testing.T) {
conf := testConf()
conf.StreamCloseTimeout = 10 * time.Millisecond
@@ -15375,9 +15467,27 @@ func TestReadDeadline(t *testing.T) {
}
buf := make([]byte, 4)
- if _, err := stream.Read(buf); err != ErrTimeout {
+ _, err = stream.Read(buf)
+ if err != ErrTimeout {
t.Fatalf("err: %v", err)
}
+
+ // See https://github.com/hashicorp/yamux/issues/90
+ // The standard library's http server package will read from connections in
+ // the background to detect if they are alive.
+ //
+ // It sets a read deadline on connections and detect if the returned error
+ // is a network timeout error which implements net.Error.
+ //
+ // The HTTP server will cancel all server requests if it isn't timeout error
+ // from the connection.
+ //
+ // We assert that we return an error meeting the interface to avoid
+ // accidently breaking yamux session compatability with the standard
+ // library's http server implementation.
+ if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
+ t.Fatalf("reading timeout error is expected to implement net.Error and return true when calling Timeout()")
+ }
}
func TestReadDeadline_BlockedRead(t *testing.T) {
@@ -16231,6 +16341,9 @@ type Stream struct {
readDeadline atomic.Value // time.Time
writeDeadline atomic.Value // time.Time
+ // establishCh is notified if the stream is established or being closed.
+ establishCh chan struct{}
+
// closeTimer is set with stateLock held to honor the StreamCloseTimeout
// setting on Session.
closeTimer *time.Timer
@@ -16251,6 +16364,7 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
sendWindow: initialStreamWindow,
recvNotifyCh: make(chan struct{}, 1),
sendNotifyCh: make(chan struct{}, 1),
+ establishCh: make(chan struct{}, 1),
}
s.readDeadline.Store(time.Time{})
s.writeDeadline.Store(time.Time{})
@@ -16346,7 +16460,7 @@ func (s *Stream) Write(b []byte) (n int, err error) {
func (s *Stream) write(b []byte) (n int, err error) {
var flags uint16
var max uint32
- var body io.Reader
+ var body []byte
START:
s.stateLock.Lock()
switch s.state {
@@ -16372,7 +16486,7 @@ START:
// Send up to our send window
max = min(window, uint32(len(b)))
- body = bytes.NewReader(b[:max])
+ body = b[:max]
// Send the header
s.sendHdr.encode(typeData, flags, s.id, max)
@@ -16574,6 +16688,7 @@ func (s *Stream) processFlags(flags uint16) error {
if s.state == streamSYNSent {
s.state = streamEstablished
}
+ asyncNotify(s.establishCh)
s.session.establishStream(s.id)
}
if flags&flagFIN == flagFIN {
@@ -16606,6 +16721,7 @@ func (s *Stream) processFlags(flags uint16) error {
func (s *Stream) notifyWaiting() {
asyncNotify(s.recvNotifyCh)
asyncNotify(s.sendNotifyCh)
+ asyncNotify(s.establishCh)
}
// incrSendWindow updates the size of our send window
diff --git a/go.mod b/go.mod
index 0b7441847..17df17b6f 100644
--- a/go.mod
+++ b/go.mod
@@ -15,7 +15,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.4
- github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864
+ github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87
github.com/jackc/pgconn v1.12.1
github.com/jackc/pgtype v1.11.0
github.com/jackc/pgx/v4 v4.16.1
diff --git a/go.sum b/go.sum
index 4c2afd61c..fb3daca97 100644
--- a/go.sum
+++ b/go.sum
@@ -618,8 +618,9 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
-github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864 h1:Y4V+SFe7d3iH+9pJCoeWIOS5/xBJIFsltS7E+KJSsJY=
github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 h1:xixZ2bWeofWV68J+x6AzmKuVM/JWCQwkWm6GW/MUR6I=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
github.com/hhatto/gorst v0.0.0-20181029133204-ca9f730cac5b h1:Jdu2tbAxkRouSILp2EbposIb8h4gO+2QuZEn3d9sKAc=
github.com/hhatto/gorst v0.0.0-20181029133204-ca9f730cac5b/go.mod h1:HmaZGXHdSwQh1jnUlBGN2BeEYOHACLVGzYOXCbsLvxY=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=