Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-pages.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/golang.org/x/net/http2/server.go')
-rw-r--r--vendor/golang.org/x/net/http2/server.go1366
1 files changed, 981 insertions, 385 deletions
diff --git a/vendor/golang.org/x/net/http2/server.go b/vendor/golang.org/x/net/http2/server.go
index 8206fa79..39ed755a 100644
--- a/vendor/golang.org/x/net/http2/server.go
+++ b/vendor/golang.org/x/net/http2/server.go
@@ -2,17 +2,6 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// TODO: replace all <-sc.doneServing with reads from the stream's cw
-// instead, and make sure that on close we close all open
-// streams. then remove doneServing?
-
-// TODO: re-audit GOAWAY support. Consider each incoming frame type and
-// whether it should be ignored during graceful shutdown.
-
-// TODO: disconnect idle clients. GFE seems to do 4 minutes. make
-// configurable? or maximum number of idle clients and remove the
-// oldest?
-
// TODO: turn off the serve goroutine when idle, so
// an idle conn only has the readFrames goroutine active. (which could
// also be optimized probably to pin less memory in crypto/tls). This
@@ -44,6 +33,7 @@ import (
"fmt"
"io"
"log"
+ "math"
"net"
"net/http"
"net/textproto"
@@ -114,6 +104,47 @@ type Server struct {
// PermitProhibitedCipherSuites, if true, permits the use of
// cipher suites prohibited by the HTTP/2 spec.
PermitProhibitedCipherSuites bool
+
+ // IdleTimeout specifies how long until idle clients should be
+ // closed with a GOAWAY frame. PING frames are not considered
+ // activity for the purposes of IdleTimeout.
+ IdleTimeout time.Duration
+
+ // MaxUploadBufferPerConnection is the size of the initial flow
+ // control window for each connections. The HTTP/2 spec does not
+ // allow this to be smaller than 65535 or larger than 2^32-1.
+ // If the value is outside this range, a default value will be
+ // used instead.
+ MaxUploadBufferPerConnection int32
+
+ // MaxUploadBufferPerStream is the size of the initial flow control
+ // window for each stream. The HTTP/2 spec does not allow this to
+ // be larger than 2^32-1. If the value is zero or larger than the
+ // maximum, a default value will be used instead.
+ MaxUploadBufferPerStream int32
+
+ // NewWriteScheduler constructs a write scheduler for a connection.
+ // If nil, a default scheduler is chosen.
+ NewWriteScheduler func() WriteScheduler
+
+ // Internal state. This is a pointer (rather than embedded directly)
+ // so that we don't embed a Mutex in this struct, which will make the
+ // struct non-copyable, which might break some callers.
+ state *serverInternalState
+}
+
+func (s *Server) initialConnRecvWindowSize() int32 {
+ if s.MaxUploadBufferPerConnection > initialWindowSize {
+ return s.MaxUploadBufferPerConnection
+ }
+ return 1 << 20
+}
+
+func (s *Server) initialStreamRecvWindowSize() int32 {
+ if s.MaxUploadBufferPerStream > 0 {
+ return s.MaxUploadBufferPerStream
+ }
+ return 1 << 20
}
func (s *Server) maxReadFrameSize() uint32 {
@@ -130,27 +161,74 @@ func (s *Server) maxConcurrentStreams() uint32 {
return defaultMaxStreams
}
+type serverInternalState struct {
+ mu sync.Mutex
+ activeConns map[*serverConn]struct{}
+}
+
+func (s *serverInternalState) registerConn(sc *serverConn) {
+ if s == nil {
+ return // if the Server was used without calling ConfigureServer
+ }
+ s.mu.Lock()
+ s.activeConns[sc] = struct{}{}
+ s.mu.Unlock()
+}
+
+func (s *serverInternalState) unregisterConn(sc *serverConn) {
+ if s == nil {
+ return // if the Server was used without calling ConfigureServer
+ }
+ s.mu.Lock()
+ delete(s.activeConns, sc)
+ s.mu.Unlock()
+}
+
+func (s *serverInternalState) startGracefulShutdown() {
+ if s == nil {
+ return // if the Server was used without calling ConfigureServer
+ }
+ s.mu.Lock()
+ for sc := range s.activeConns {
+ sc.startGracefulShutdown()
+ }
+ s.mu.Unlock()
+}
+
// ConfigureServer adds HTTP/2 support to a net/http Server.
//
// The configuration conf may be nil.
//
// ConfigureServer must be called before s begins serving.
func ConfigureServer(s *http.Server, conf *Server) error {
+ if s == nil {
+ panic("nil *http.Server")
+ }
if conf == nil {
conf = new(Server)
}
+ conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
+ if err := configureServer18(s, conf); err != nil {
+ return err
+ }
+ if err := configureServer19(s, conf); err != nil {
+ return err
+ }
if s.TLSConfig == nil {
s.TLSConfig = new(tls.Config)
} else if s.TLSConfig.CipherSuites != nil {
// If they already provided a CipherSuite list, return
// an error if it has a bad order or is missing
- // ECDHE_RSA_WITH_AES_128_GCM_SHA256.
- const requiredCipher = tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
+ // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
haveRequired := false
sawBad := false
for i, cs := range s.TLSConfig.CipherSuites {
- if cs == requiredCipher {
+ switch cs {
+ case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ // Alternative MTI cipher to not discourage ECDSA-only servers.
+ // See http://golang.org/cl/30721 for further information.
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
haveRequired = true
}
if isBadCipher(cs) {
@@ -160,7 +238,7 @@ func ConfigureServer(s *http.Server, conf *Server) error {
}
}
if !haveRequired {
- return fmt.Errorf("http2: TLSConfig.CipherSuites is missing HTTP/2-required TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256")
+ return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher.")
}
}
@@ -183,9 +261,6 @@ func ConfigureServer(s *http.Server, conf *Server) error {
if !haveNPN {
s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS)
}
- // h2-14 is temporary (as of 2015-03-05) while we wait for all browsers
- // to switch to "h2".
- s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, "h2-14")
if s.TLSNextProto == nil {
s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
@@ -200,7 +275,6 @@ func ConfigureServer(s *http.Server, conf *Server) error {
})
}
s.TLSNextProto[NextProtoTLS] = protoHandler
- s.TLSNextProto["h2-14"] = protoHandler // temporary; see above.
return nil
}
@@ -254,29 +328,50 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
defer cancel()
sc := &serverConn{
- srv: s,
- hs: opts.baseConfig(),
- conn: c,
- baseCtx: baseCtx,
- remoteAddrStr: c.RemoteAddr().String(),
- bw: newBufferedWriter(c),
- handler: opts.handler(),
- streams: make(map[uint32]*stream),
- readFrameCh: make(chan readFrameResult),
- wantWriteFrameCh: make(chan frameWriteMsg, 8),
- wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
- bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
- doneServing: make(chan struct{}),
- advMaxStreams: s.maxConcurrentStreams(),
- writeSched: writeScheduler{
- maxFrameSize: initialMaxFrameSize,
- },
- initialWindowSize: initialWindowSize,
- headerTableSize: initialHeaderTableSize,
- serveG: newGoroutineLock(),
- pushEnabled: true,
+ srv: s,
+ hs: opts.baseConfig(),
+ conn: c,
+ baseCtx: baseCtx,
+ remoteAddrStr: c.RemoteAddr().String(),
+ bw: newBufferedWriter(c),
+ handler: opts.handler(),
+ streams: make(map[uint32]*stream),
+ readFrameCh: make(chan readFrameResult),
+ wantWriteFrameCh: make(chan FrameWriteRequest, 8),
+ serveMsgCh: make(chan interface{}, 8),
+ wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
+ bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
+ doneServing: make(chan struct{}),
+ clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
+ advMaxStreams: s.maxConcurrentStreams(),
+ initialStreamSendWindowSize: initialWindowSize,
+ maxFrameSize: initialMaxFrameSize,
+ headerTableSize: initialHeaderTableSize,
+ serveG: newGoroutineLock(),
+ pushEnabled: true,
+ }
+
+ s.state.registerConn(sc)
+ defer s.state.unregisterConn(sc)
+
+ // The net/http package sets the write deadline from the
+ // http.Server.WriteTimeout during the TLS handshake, but then
+ // passes the connection off to us with the deadline already set.
+ // Write deadlines are set per stream in serverConn.newStream.
+ // Disarm the net.Conn write deadline here.
+ if sc.hs.WriteTimeout != 0 {
+ sc.conn.SetWriteDeadline(time.Time{})
+ }
+
+ if s.NewWriteScheduler != nil {
+ sc.writeSched = s.NewWriteScheduler()
+ } else {
+ sc.writeSched = NewRandomWriteScheduler()
}
+ // These start at the RFC-specified defaults. If there is a higher
+ // configured value for inflow, that will be updated when we send a
+ // WINDOW_UPDATE shortly after sending SETTINGS.
sc.flow.add(initialWindowSize)
sc.inflow.add(initialWindowSize)
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
@@ -311,7 +406,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
// addresses during development.
//
// TODO: optionally enforce? Or enforce at the time we receive
- // a new request, and verify the the ServerName matches the :authority?
+ // a new request, and verify the ServerName matches the :authority?
// But that precludes proxy situations, perhaps.
//
// So for now, do nothing here again.
@@ -356,45 +451,52 @@ type serverConn struct {
handler http.Handler
baseCtx contextContext
framer *Framer
- doneServing chan struct{} // closed when serverConn.serve ends
- readFrameCh chan readFrameResult // written by serverConn.readFrames
- wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
- wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
- bodyReadCh chan bodyReadMsg // from handlers -> serve
- testHookCh chan func(int) // code to run on the serve loop
- flow flow // conn-wide (not stream-specific) outbound flow control
- inflow flow // conn-wide inbound flow control
- tlsState *tls.ConnectionState // shared by all handlers, like net/http
+ doneServing chan struct{} // closed when serverConn.serve ends
+ readFrameCh chan readFrameResult // written by serverConn.readFrames
+ wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
+ wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
+ bodyReadCh chan bodyReadMsg // from handlers -> serve
+ serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
+ flow flow // conn-wide (not stream-specific) outbound flow control
+ inflow flow // conn-wide inbound flow control
+ tlsState *tls.ConnectionState // shared by all handlers, like net/http
remoteAddrStr string
+ writeSched WriteScheduler
// Everything following is owned by the serve loop; use serveG.check():
- serveG goroutineLock // used to verify funcs are on serve()
- pushEnabled bool
- sawFirstSettings bool // got the initial SETTINGS frame after the preface
- needToSendSettingsAck bool
- unackedSettings int // how many SETTINGS have we sent without ACKs?
- clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
- advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
- curOpenStreams uint32 // client's number of open streams
- maxStreamID uint32 // max ever seen
- streams map[uint32]*stream
- initialWindowSize int32
- headerTableSize uint32
- peerMaxHeaderListSize uint32 // zero means unknown (default)
- canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
- writingFrame bool // started write goroutine but haven't heard back on wroteFrameCh
- needsFrameFlush bool // last frame write wasn't a flush
- writeSched writeScheduler
- inGoAway bool // we've started to or sent GOAWAY
- needToSendGoAway bool // we need to schedule a GOAWAY frame write
- goAwayCode ErrCode
- shutdownTimerCh <-chan time.Time // nil until used
- shutdownTimer *time.Timer // nil until used
- freeRequestBodyBuf []byte // if non-nil, a free initialWindowSize buffer for getRequestBodyBuf
+ serveG goroutineLock // used to verify funcs are on serve()
+ pushEnabled bool
+ sawFirstSettings bool // got the initial SETTINGS frame after the preface
+ needToSendSettingsAck bool
+ unackedSettings int // how many SETTINGS have we sent without ACKs?
+ clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
+ advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
+ curClientStreams uint32 // number of open streams initiated by the client
+ curPushedStreams uint32 // number of open streams initiated by server push
+ maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
+ maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
+ streams map[uint32]*stream
+ initialStreamSendWindowSize int32
+ maxFrameSize int32
+ headerTableSize uint32
+ peerMaxHeaderListSize uint32 // zero means unknown (default)
+ canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
+ writingFrame bool // started writing a frame (on serve goroutine or separate)
+ writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
+ needsFrameFlush bool // last frame write wasn't a flush
+ inGoAway bool // we've started to or sent GOAWAY
+ inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
+ needToSendGoAway bool // we need to schedule a GOAWAY frame write
+ goAwayCode ErrCode
+ shutdownTimer *time.Timer // nil until used
+ idleTimer *time.Timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
hpackEncoder *hpack.Encoder
+
+ // Used by startGracefulShutdown.
+ shutdownOnce sync.Once
}
func (sc *serverConn) maxHeaderListSize() uint32 {
@@ -409,6 +511,11 @@ func (sc *serverConn) maxHeaderListSize() uint32 {
return uint32(n + typicalHeaders*perFieldOverhead)
}
+func (sc *serverConn) curOpenStreams() uint32 {
+ sc.serveG.check()
+ return sc.curClientStreams + sc.curPushedStreams
+}
+
// stream represents a stream. This is the minimal metadata needed by
// the serve goroutine. Most of the actual stream state is owned by
// the http.Handler's goroutine in the responseWriter. Because the
@@ -434,11 +541,10 @@ type stream struct {
numTrailerValues int64
weight uint8
state streamState
- sentReset bool // only true once detached from streams map
- gotReset bool // only true once detacted from streams map
- gotTrailerHeader bool // HEADER frame for trailers was seen
- wroteHeaders bool // whether we wrote headers (not status 100)
- reqBuf []byte
+ resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
+ gotTrailerHeader bool // HEADER frame for trailers was seen
+ wroteHeaders bool // whether we wrote headers (not status 100)
+ writeDeadline *time.Timer // nil if unused
trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer
@@ -453,7 +559,7 @@ func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
sc.serveG.check()
- // http://http2.github.io/http2-spec/#rfc.section.5.1
+ // http://tools.ietf.org/html/rfc7540#section-5.1
if st, ok := sc.streams[streamID]; ok {
return st.state, st
}
@@ -463,8 +569,14 @@ func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
// a client sends a HEADERS frame on stream 7 without ever sending a
// frame on stream 5, then stream 5 transitions to the "closed"
// state when the first frame for stream 7 is sent or received."
- if streamID <= sc.maxStreamID {
- return stateClosed, nil
+ if streamID%2 == 1 {
+ if streamID <= sc.maxClientStreamID {
+ return stateClosed, nil
+ }
+ } else {
+ if streamID <= sc.maxPushPromiseID {
+ return stateClosed, nil
+ }
}
return stateIdle, nil
}
@@ -540,7 +652,7 @@ func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
if err == nil {
return
}
- if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) {
+ if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout {
// Boring, expected errors.
sc.vlogf(format, args...)
} else {
@@ -603,17 +715,17 @@ func (sc *serverConn) readFrames() {
// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
type frameWriteResult struct {
- wm frameWriteMsg // what was written (or attempted)
- err error // result of the writeFrame call
+ wr FrameWriteRequest // what was written (or attempted)
+ err error // result of the writeFrame call
}
// writeFrameAsync runs in its own goroutine and writes a single frame
// and then reports when it's done.
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
-func (sc *serverConn) writeFrameAsync(wm frameWriteMsg) {
- err := wm.write.writeFrame(sc)
- sc.wroteFrameCh <- frameWriteResult{wm, err}
+func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) {
+ err := wr.write.writeFrame(sc)
+ sc.wroteFrameCh <- frameWriteResult{wr, err}
}
func (sc *serverConn) closeAllStreamsOnConnClose() {
@@ -657,40 +769,53 @@ func (sc *serverConn) serve() {
sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
}
- sc.writeFrame(frameWriteMsg{
+ sc.writeFrame(FrameWriteRequest{
write: writeSettings{
{SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
{SettingMaxConcurrentStreams, sc.advMaxStreams},
{SettingMaxHeaderListSize, sc.maxHeaderListSize()},
-
- // TODO: more actual settings, notably
- // SettingInitialWindowSize, but then we also
- // want to bump up the conn window size the
- // same amount here right after the settings
+ {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
},
})
sc.unackedSettings++
+ // Each connection starts with intialWindowSize inflow tokens.
+ // If a higher value is configured, we add more tokens.
+ if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
+ sc.sendWindowUpdate(nil, int(diff))
+ }
+
if err := sc.readPreface(); err != nil {
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
return
}
// Now that we've got the preface, get us out of the
- // "StateNew" state. We can't go directly to idle, though.
+ // "StateNew" state. We can't go directly to idle, though.
// Active means we read some data and anticipate a request. We'll
// do another Active when we get a HEADERS frame.
sc.setConnState(http.StateActive)
sc.setConnState(http.StateIdle)
+ if sc.srv.IdleTimeout != 0 {
+ sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
+ defer sc.idleTimer.Stop()
+ }
+
go sc.readFrames() // closed by defer sc.conn.Close above
- settingsTimer := time.NewTimer(firstSettingsTimeout)
+ settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
+ defer settingsTimer.Stop()
+
loopNum := 0
for {
loopNum++
select {
- case wm := <-sc.wantWriteFrameCh:
- sc.writeFrame(wm)
+ case wr := <-sc.wantWriteFrameCh:
+ if se, ok := wr.write.(StreamError); ok {
+ sc.resetStream(se)
+ break
+ }
+ sc.writeFrame(wr)
case res := <-sc.wroteFrameCh:
sc.wroteFrame(res)
case res := <-sc.readFrameCh:
@@ -698,26 +823,85 @@ func (sc *serverConn) serve() {
return
}
res.readMore()
- if settingsTimer.C != nil {
+ if settingsTimer != nil {
settingsTimer.Stop()
- settingsTimer.C = nil
+ settingsTimer = nil
}
case m := <-sc.bodyReadCh:
sc.noteBodyRead(m.st, m.n)
- case <-settingsTimer.C:
- sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
- return
- case <-sc.shutdownTimerCh:
- sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
- return
- case fn := <-sc.testHookCh:
- fn(loopNum)
+ case msg := <-sc.serveMsgCh:
+ switch v := msg.(type) {
+ case func(int):
+ v(loopNum) // for testing
+ case *serverMessage:
+ switch v {
+ case settingsTimerMsg:
+ sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
+ return
+ case idleTimerMsg:
+ sc.vlogf("connection is idle")
+ sc.goAway(ErrCodeNo)
+ case shutdownTimerMsg:
+ sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
+ return
+ case gracefulShutdownMsg:
+ sc.startGracefulShutdownInternal()
+ default:
+ panic("unknown timer")
+ }
+ case *startPushRequest:
+ sc.startPush(v)
+ default:
+ panic(fmt.Sprintf("unexpected type %T", v))
+ }
}
+
+ // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY
+ // with no error code (graceful shutdown), don't start the timer until
+ // all open streams have been completed.
+ sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame
+ gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0
+ if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) {
+ sc.shutDownIn(goAwayTimeout)
+ }
+ }
+}
+
+func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
+ select {
+ case <-sc.doneServing:
+ case <-sharedCh:
+ close(privateCh)
+ }
+}
+
+type serverMessage int
+
+// Message values sent to serveMsgCh.
+var (
+ settingsTimerMsg = new(serverMessage)
+ idleTimerMsg = new(serverMessage)
+ shutdownTimerMsg = new(serverMessage)
+ gracefulShutdownMsg = new(serverMessage)
+)
+
+func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
+func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
+func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
+
+func (sc *serverConn) sendServeMsg(msg interface{}) {
+ sc.serveG.checkNotOn() // NOT
+ select {
+ case sc.serveMsgCh <- msg:
+ case <-sc.doneServing:
}
}
-// readPreface reads the ClientPreface greeting from the peer
-// or returns an error on timeout or an invalid greeting.
+var errPrefaceTimeout = errors.New("timeout waiting for client preface")
+
+// readPreface reads the ClientPreface greeting from the peer or
+// returns errPrefaceTimeout on timeout, or an error if the greeting
+// is invalid.
func (sc *serverConn) readPreface() error {
errc := make(chan error, 1)
go func() {
@@ -735,7 +919,7 @@ func (sc *serverConn) readPreface() error {
defer timer.Stop()
select {
case <-timer.C:
- return errors.New("timeout waiting for client preface")
+ return errPrefaceTimeout
case err := <-errc:
if err == nil {
if VerboseLogs {
@@ -760,7 +944,7 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStrea
ch := errChanPool.Get().(chan error)
writeArg := writeDataPool.Get().(*writeData)
*writeArg = writeData{stream.id, data, endStream}
- err := sc.writeFrameFromHandler(frameWriteMsg{
+ err := sc.writeFrameFromHandler(FrameWriteRequest{
write: writeArg,
stream: stream,
done: ch,
@@ -796,17 +980,17 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStrea
return err
}
-// writeFrameFromHandler sends wm to sc.wantWriteFrameCh, but aborts
+// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts
// if the connection has gone away.
//
// This must not be run from the serve goroutine itself, else it might
// deadlock writing to sc.wantWriteFrameCh (which is only mildly
// buffered and is read by serve itself). If you're on the serve
// goroutine, call writeFrame instead.
-func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) error {
+func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
sc.serveG.checkNotOn() // NOT
select {
- case sc.wantWriteFrameCh <- wm:
+ case sc.wantWriteFrameCh <- wr:
return nil
case <-sc.doneServing:
// Serve loop is gone.
@@ -823,55 +1007,103 @@ func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) error {
// make it onto the wire
//
// If you're not on the serve goroutine, use writeFrameFromHandler instead.
-func (sc *serverConn) writeFrame(wm frameWriteMsg) {
+func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
sc.serveG.check()
+ // If true, wr will not be written and wr.done will not be signaled.
var ignoreWrite bool
+ // We are not allowed to write frames on closed streams. RFC 7540 Section
+ // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
+ // a closed stream." Our server never sends PRIORITY, so that exception
+ // does not apply.
+ //
+ // The serverConn might close an open stream while the stream's handler
+ // is still running. For example, the server might close a stream when it
+ // receives bad data from the client. If this happens, the handler might
+ // attempt to write a frame after the stream has been closed (since the
+ // handler hasn't yet been notified of the close). In this case, we simply
+ // ignore the frame. The handler will notice that the stream is closed when
+ // it waits for the frame to be written.
+ //
+ // As an exception to this rule, we allow sending RST_STREAM after close.
+ // This allows us to immediately reject new streams without tracking any
+ // state for those streams (except for the queued RST_STREAM frame). This
+ // may result in duplicate RST_STREAMs in some cases, but the client should
+ // ignore those.
+ if wr.StreamID() != 0 {
+ _, isReset := wr.write.(StreamError)
+ if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
+ ignoreWrite = true
+ }
+ }
+
// Don't send a 100-continue response if we've already sent headers.
// See golang.org/issue/14030.
- switch wm.write.(type) {
+ switch wr.write.(type) {
case *writeResHeaders:
- wm.stream.wroteHeaders = true
+ wr.stream.wroteHeaders = true
case write100ContinueHeadersFrame:
- if wm.stream.wroteHeaders {
+ if wr.stream.wroteHeaders {
+ // We do not need to notify wr.done because this frame is
+ // never written with wr.done != nil.
+ if wr.done != nil {
+ panic("wr.done != nil for write100ContinueHeadersFrame")
+ }
ignoreWrite = true
}
}
if !ignoreWrite {
- sc.writeSched.add(wm)
+ sc.writeSched.Push(wr)
}
sc.scheduleFrameWrite()
}
-// startFrameWrite starts a goroutine to write wm (in a separate
+// startFrameWrite starts a goroutine to write wr (in a separate
// goroutine since that might block on the network), and updates the
-// serve goroutine's state about the world, updated from info in wm.
-func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
+// serve goroutine's state about the world, updated from info in wr.
+func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
sc.serveG.check()
if sc.writingFrame {
panic("internal error: can only be writing one frame at a time")
}
- st := wm.stream
+ st := wr.stream
if st != nil {
switch st.state {
case stateHalfClosedLocal:
- panic("internal error: attempt to send frame on half-closed-local stream")
- case stateClosed:
- if st.sentReset || st.gotReset {
- // Skip this frame.
- sc.scheduleFrameWrite()
- return
+ switch wr.write.(type) {
+ case StreamError, handlerPanicRST, writeWindowUpdate:
+ // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
+ // in this state. (We never send PRIORITY from the server, so that is not checked.)
+ default:
+ panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
}
- panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wm))
+ case stateClosed:
+ panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
+ }
+ }
+ if wpp, ok := wr.write.(*writePushPromise); ok {
+ var err error
+ wpp.promisedID, err = wpp.allocatePromisedID()
+ if err != nil {
+ sc.writingFrameAsync = false
+ wr.replyToWriter(err)
+ return
}
}
sc.writingFrame = true
sc.needsFrameFlush = true
- go sc.writeFrameAsync(wm)
+ if wr.write.staysWithinBuffer(sc.bw.Available()) {
+ sc.writingFrameAsync = false
+ err := wr.write.writeFrame(sc)
+ sc.wroteFrame(frameWriteResult{wr, err})
+ } else {
+ sc.writingFrameAsync = true
+ go sc.writeFrameAsync(wr)
+ }
}
// errHandlerPanicked is the error given to any callers blocked in a read from
@@ -887,27 +1119,12 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
panic("internal error: expected to be already writing a frame")
}
sc.writingFrame = false
+ sc.writingFrameAsync = false
- wm := res.wm
- st := wm.stream
-
- closeStream := endsStream(wm.write)
+ wr := res.wr
- if _, ok := wm.write.(handlerPanicRST); ok {
- sc.closeStream(st, errHandlerPanicked)
- }
-
- // Reply (if requested) to the blocked ServeHTTP goroutine.
- if ch := wm.done; ch != nil {
- select {
- case ch <- res.err:
- default:
- panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.write))
- }
- }
- wm.write = nil // prevent use (assume it's tainted after wm.done send)
-
- if closeStream {
+ if writeEndsStream(wr.write) {
+ st := wr.stream
if st == nil {
panic("internal error: expecting non-nil stream")
}
@@ -916,19 +1133,37 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
// Here we would go to stateHalfClosedLocal in
// theory, but since our handler is done and
// the net/http package provides no mechanism
- // for finishing writing to a ResponseWriter
- // while still reading data (see possible TODO
- // at top of this file), we go into closed
- // state here anyway, after telling the peer
- // we're hanging up on them.
- st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream
- errCancel := streamError(st.id, ErrCodeCancel)
- sc.resetStream(errCancel)
+ // for closing a ResponseWriter while still
+ // reading data (see possible TODO at top of
+ // this file), we go into closed state here
+ // anyway, after telling the peer we're
+ // hanging up on them. We'll transition to
+ // stateClosed after the RST_STREAM frame is
+ // written.
+ st.state = stateHalfClosedLocal
+ // Section 8.1: a server MAY request that the client abort
+ // transmission of a request without error by sending a
+ // RST_STREAM with an error code of NO_ERROR after sending
+ // a complete response.
+ sc.resetStream(streamError(st.id, ErrCodeNo))
case stateHalfClosedRemote:
sc.closeStream(st, errHandlerComplete)
}
+ } else {
+ switch v := wr.write.(type) {
+ case StreamError:
+ // st may be unknown if the RST_STREAM was generated to reject bad input.
+ if st, ok := sc.streams[v.StreamID]; ok {
+ sc.closeStream(st, v)
+ }
+ case handlerPanicRST:
+ sc.closeStream(wr.stream, errHandlerPanicked)
+ }
}
+ // Reply (if requested) to unblock the ServeHTTP goroutine.
+ wr.replyToWriter(res.err)
+
sc.scheduleFrameWrite()
}
@@ -946,35 +1181,72 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
// flush the write buffer.
func (sc *serverConn) scheduleFrameWrite() {
sc.serveG.check()
- if sc.writingFrame {
- return
- }
- if sc.needToSendGoAway {
- sc.needToSendGoAway = false
- sc.startFrameWrite(frameWriteMsg{
- write: &writeGoAway{
- maxStreamID: sc.maxStreamID,
- code: sc.goAwayCode,
- },
- })
- return
- }
- if sc.needToSendSettingsAck {
- sc.needToSendSettingsAck = false
- sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck{}})
+ if sc.writingFrame || sc.inFrameScheduleLoop {
return
}
- if !sc.inGoAway {
- if wm, ok := sc.writeSched.take(); ok {
- sc.startFrameWrite(wm)
- return
+ sc.inFrameScheduleLoop = true
+ for !sc.writingFrameAsync {
+ if sc.needToSendGoAway {
+ sc.needToSendGoAway = false
+ sc.startFrameWrite(FrameWriteRequest{
+ write: &writeGoAway{
+ maxStreamID: sc.maxClientStreamID,
+ code: sc.goAwayCode,
+ },
+ })
+ continue
}
+ if sc.needToSendSettingsAck {
+ sc.needToSendSettingsAck = false
+ sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
+ continue
+ }
+ if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
+ if wr, ok := sc.writeSched.Pop(); ok {
+ sc.startFrameWrite(wr)
+ continue
+ }
+ }
+ if sc.needsFrameFlush {
+ sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
+ sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
+ continue
+ }
+ break
}
- if sc.needsFrameFlush {
- sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter{}})
- sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
- return
- }
+ sc.inFrameScheduleLoop = false
+}
+
+// startGracefulShutdown gracefully shuts down a connection. This
+// sends GOAWAY with ErrCodeNo to tell the client we're gracefully
+// shutting down. The connection isn't closed until all current
+// streams are done.
+//
+// startGracefulShutdown returns immediately; it does not wait until
+// the connection has shut down.
+func (sc *serverConn) startGracefulShutdown() {
+ sc.serveG.checkNotOn() // NOT
+ sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) })
+}
+
+// After sending GOAWAY, the connection will close after goAwayTimeout.
+// If we close the connection immediately after sending GOAWAY, there may
+// be unsent data in our kernel receive buffer, which will cause the kernel
+// to send a TCP RST on close() instead of a FIN. This RST will abort the
+// connection immediately, whether or not the client had received the GOAWAY.
+//
+// Ideally we should delay for at least 1 RTT + epsilon so the client has
+// a chance to read the GOAWAY and stop sending messages. Measuring RTT
+// is hard, so we approximate with 1 second. See golang.org/issue/18701.
+//
+// This is a var so it can be shorter in tests, where all requests uses the
+// loopback interface making the expected RTT very small.
+//
+// TODO: configurable?
+var goAwayTimeout = 1 * time.Second
+
+func (sc *serverConn) startGracefulShutdownInternal() {
+ sc.goAway(ErrCodeNo)
}
func (sc *serverConn) goAway(code ErrCode) {
@@ -982,12 +1254,6 @@ func (sc *serverConn) goAway(code ErrCode) {
if sc.inGoAway {
return
}
- if code != ErrCodeNo {
- sc.shutDownIn(250 * time.Millisecond)
- } else {
- // TODO: configurable
- sc.shutDownIn(1 * time.Second)
- }
sc.inGoAway = true
sc.needToSendGoAway = true
sc.goAwayCode = code
@@ -996,16 +1262,14 @@ func (sc *serverConn) goAway(code ErrCode) {
func (sc *serverConn) shutDownIn(d time.Duration) {
sc.serveG.check()
- sc.shutdownTimer = time.NewTimer(d)
- sc.shutdownTimerCh = sc.shutdownTimer.C
+ sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
}
func (sc *serverConn) resetStream(se StreamError) {
sc.serveG.check()
- sc.writeFrame(frameWriteMsg{write: se})
+ sc.writeFrame(FrameWriteRequest{write: se})
if st, ok := sc.streams[se.StreamID]; ok {
- st.sentReset = true
- sc.closeStream(st, se)
+ st.resetQueued = true
}
}
@@ -1090,6 +1354,8 @@ func (sc *serverConn) processFrame(f Frame) error {
return sc.processResetStream(f)
case *PriorityFrame:
return sc.processPriority(f)
+ case *GoAwayFrame:
+ return sc.processGoAway(f)
case *PushPromiseFrame:
// A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
// frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
@@ -1115,7 +1381,10 @@ func (sc *serverConn) processPing(f *PingFrame) error {
// PROTOCOL_ERROR."
return ConnectionError(ErrCodeProtocol)
}
- sc.writeFrame(frameWriteMsg{write: writePingAck{f}})
+ if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
+ return nil
+ }
+ sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
return nil
}
@@ -1123,7 +1392,14 @@ func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
sc.serveG.check()
switch {
case f.StreamID != 0: // stream-level flow control
- st := sc.streams[f.StreamID]
+ state, st := sc.state(f.StreamID)
+ if state == stateIdle {
+ // Section 5.1: "Receiving any frame other than HEADERS
+ // or PRIORITY on a stream in this state MUST be
+ // treated as a connection error (Section 5.4.1) of
+ // type PROTOCOL_ERROR."
+ return ConnectionError(ErrCodeProtocol)
+ }
if st == nil {
// "WINDOW_UPDATE can be sent by a peer that has sent a
// frame bearing the END_STREAM flag. This means that a
@@ -1157,7 +1433,6 @@ func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
return ConnectionError(ErrCodeProtocol)
}
if st != nil {
- st.gotReset = true
st.cancelCtx()
sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
}
@@ -1170,11 +1445,24 @@ func (sc *serverConn) closeStream(st *stream, err error) {
panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
}
st.state = stateClosed
- sc.curOpenStreams--
- if sc.curOpenStreams == 0 {
- sc.setConnState(http.StateIdle)
+ if st.writeDeadline != nil {
+ st.writeDeadline.Stop()
+ }
+ if st.isPushed() {
+ sc.curPushedStreams--
+ } else {
+ sc.curClientStreams--
}
delete(sc.streams, st.id)
+ if len(sc.streams) == 0 {
+ sc.setConnState(http.StateIdle)
+ if sc.srv.IdleTimeout != 0 {
+ sc.idleTimer.Reset(sc.srv.IdleTimeout)
+ }
+ if h1ServerKeepAlivesDisabled(sc.hs) {
+ sc.startGracefulShutdownInternal()
+ }
+ }
if p := st.body; p != nil {
// Return any buffered unread bytes worth of conn-level flow control.
// See golang.org/issue/16481
@@ -1183,19 +1471,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
p.CloseWithError(err)
}
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
- sc.writeSched.forgetStream(st.id)
- if st.reqBuf != nil {
- // Stash this request body buffer (64k) away for reuse
- // by a future POST/PUT/etc.
- //
- // TODO(bradfitz): share on the server? sync.Pool?
- // Server requires locks and might hurt contention.
- // sync.Pool might work, or might be worse, depending
- // on goroutine CPU migrations. (get and put on
- // separate CPUs). Maybe a mix of strategies. But
- // this is an easy win for now.
- sc.freeRequestBodyBuf = st.reqBuf
- }
+ sc.writeSched.CloseStream(st.id)
}
func (sc *serverConn) processSettings(f *SettingsFrame) error {
@@ -1237,7 +1513,7 @@ func (sc *serverConn) processSetting(s Setting) error {
case SettingInitialWindowSize:
return sc.processSettingInitialWindowSize(s.Val)
case SettingMaxFrameSize:
- sc.writeSched.maxFrameSize = s.Val
+ sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
case SettingMaxHeaderListSize:
sc.peerMaxHeaderListSize = s.Val
default:
@@ -1262,9 +1538,9 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
// adjust the size of all stream flow control windows that it
// maintains by the difference between the new value and the
// old value."
- old := sc.initialWindowSize
- sc.initialWindowSize = int32(val)
- growth := sc.initialWindowSize - old // may be negative
+ old := sc.initialStreamSendWindowSize
+ sc.initialStreamSendWindowSize = int32(val)
+ growth := int32(val) - old // may be negative
for _, st := range sc.streams {
if !st.flow.add(growth) {
// 6.9.2 Initial Flow Control Window Size
@@ -1281,14 +1557,24 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
func (sc *serverConn) processData(f *DataFrame) error {
sc.serveG.check()
+ if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
+ return nil
+ }
data := f.Data()
// "If a DATA frame is received whose stream is not in "open"
// or "half closed (local)" state, the recipient MUST respond
// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
id := f.Header().StreamID
- st, ok := sc.streams[id]
- if !ok || st.state != stateOpen || st.gotTrailerHeader {
+ state, st := sc.state(id)
+ if id == 0 || state == stateIdle {
+ // Section 5.1: "Receiving any frame other than HEADERS
+ // or PRIORITY on a stream in this state MUST be
+ // treated as a connection error (Section 5.4.1) of
+ // type PROTOCOL_ERROR."
+ return ConnectionError(ErrCodeProtocol)
+ }
+ if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
// This includes sending a RST_STREAM if the stream is
// in stateHalfClosedLocal (which currently means that
// the http.Handler returned, so it's done reading &
@@ -1308,6 +1594,10 @@ func (sc *serverConn) processData(f *DataFrame) error {
sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
+ if st != nil && st.resetQueued {
+ // Already have a stream error in flight. Don't send another.
+ return nil
+ }
return streamError(id, ErrCodeStreamClosed)
}
if st.body == nil {
@@ -1350,6 +1640,25 @@ func (sc *serverConn) processData(f *DataFrame) error {
return nil
}
+func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
+ sc.serveG.check()
+ if f.ErrCode != ErrCodeNo {
+ sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f)
+ } else {
+ sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
+ }
+ sc.startGracefulShutdownInternal()
+ // http://tools.ietf.org/html/rfc7540#section-6.8
+ // We should not create any new streams, which means we should disable push.
+ sc.pushEnabled = false
+ return nil
+}
+
+// isPushed reports whether the stream is server-initiated.
+func (st *stream) isPushed() bool {
+ return st.id%2 == 0
+}
+
// endStream closes a Request.Body's pipe. It is called when a DATA
// frame says a request body is over (or after trailers).
func (st *stream) endStream() {
@@ -1377,14 +1686,20 @@ func (st *stream) copyTrailersToHandlerRequest() {
}
}
+// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
+// when the stream's WriteTimeout has fired.
+func (st *stream) onWriteTimeout() {
+ st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)})
+}
+
func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
sc.serveG.check()
- id := f.Header().StreamID
+ id := f.StreamID
if sc.inGoAway {
// Ignore.
return nil
}
- // http://http2.github.io/http2-spec/#rfc.section.5.1.1
+ // http://tools.ietf.org/html/rfc7540#section-5.1.1
// Streams initiated by a client MUST use odd-numbered stream
// identifiers. [...] An endpoint that receives an unexpected
// stream identifier MUST respond with a connection error
@@ -1396,8 +1711,12 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// send a trailer for an open one. If we already have a stream
// open, let it process its own HEADERS frame (trailers at this
// point, if it's valid).
- st := sc.streams[f.Header().StreamID]
- if st != nil {
+ if st := sc.streams[f.StreamID]; st != nil {
+ if st.resetQueued {
+ // We're sending RST_STREAM to close the stream, so don't bother
+ // processing this frame.
+ return nil
+ }
return st.processTrailerHeaders(f)
}
@@ -1406,54 +1725,45 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// endpoint has opened or reserved. [...] An endpoint that
// receives an unexpected stream identifier MUST respond with
// a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
- if id <= sc.maxStreamID {
+ if id <= sc.maxClientStreamID {
return ConnectionError(ErrCodeProtocol)
}
- sc.maxStreamID = id
+ sc.maxClientStreamID = id
- ctx, cancelCtx := contextWithCancel(sc.baseCtx)
- st = &stream{
- sc: sc,
- id: id,
- state: stateOpen,
- ctx: ctx,
- cancelCtx: cancelCtx,
- }
- if f.StreamEnded() {
- st.state = stateHalfClosedRemote
+ if sc.idleTimer != nil {
+ sc.idleTimer.Stop()
}
- st.cw.Init()
-
- st.flow.conn = &sc.flow // link to conn-level counter
- st.flow.add(sc.initialWindowSize)
- st.inflow.conn = &sc.inflow // link to conn-level counter
- st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
- sc.streams[id] = st
- if f.HasPriority() {
- adjustStreamPriority(sc.streams, st.id, f.Priority)
- }
- sc.curOpenStreams++
- if sc.curOpenStreams == 1 {
- sc.setConnState(http.StateActive)
- }
- if sc.curOpenStreams > sc.advMaxStreams {
- // "Endpoints MUST NOT exceed the limit set by their
- // peer. An endpoint that receives a HEADERS frame
- // that causes their advertised concurrent stream
- // limit to be exceeded MUST treat this as a stream
- // error (Section 5.4.2) of type PROTOCOL_ERROR or
- // REFUSED_STREAM."
+ // http://tools.ietf.org/html/rfc7540#section-5.1.2
+ // [...] Endpoints MUST NOT exceed the limit set by their peer. An
+ // endpoint that receives a HEADERS frame that causes their
+ // advertised concurrent stream limit to be exceeded MUST treat
+ // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
+ // or REFUSED_STREAM.
+ if sc.curClientStreams+1 > sc.advMaxStreams {
if sc.unackedSettings == 0 {
// They should know better.
- return streamError(st.id, ErrCodeProtocol)
+ return streamError(id, ErrCodeProtocol)
}
// Assume it's a network race, where they just haven't
// received our last SETTINGS update. But actually
// this can't happen yet, because we don't yet provide
// a way for users to adjust server parameters at
// runtime.
- return streamError(st.id, ErrCodeRefusedStream)
+ return streamError(id, ErrCodeRefusedStream)
+ }
+
+ initialState := stateOpen
+ if f.StreamEnded() {
+ initialState = stateHalfClosedRemote
+ }
+ st := sc.newStream(id, 0, initialState)
+
+ if f.HasPriority() {
+ if err := checkPriority(f.StreamID, f.Priority); err != nil {
+ return err
+ }
+ sc.writeSched.AdjustStream(st.id, f.Priority)
}
rw, req, err := sc.newWriterAndRequest(st, f)
@@ -1471,10 +1781,21 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
if f.Truncated {
// Their header list was too long. Send a 431 error.
handler = handleHeaderListTooLong
- } else if err := checkValidHTTP2Request(req); err != nil {
+ } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
handler = new400Handler(err)
}
+ // The net/http package sets the read deadline from the
+ // http.Server.ReadTimeout during the TLS handshake, but then
+ // passes the connection off to us with the deadline already
+ // set. Disarm it here after the request headers are read,
+ // similar to how the http1 server works. Here it's
+ // technically more like the http1 Server's ReadHeaderTimeout
+ // (in Go 1.8), though. That's a more sane option anyway.
+ if sc.hs.ReadTimeout != 0 {
+ sc.conn.SetReadDeadline(time.Time{})
+ }
+
go sc.runHandler(rw, req, handler)
return nil
}
@@ -1509,62 +1830,81 @@ func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
return nil
}
+func checkPriority(streamID uint32, p PriorityParam) error {
+ if streamID == p.StreamDep {
+ // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
+ // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
+ // Section 5.3.3 says that a stream can depend on one of its dependencies,
+ // so it's only self-dependencies that are forbidden.
+ return streamError(streamID, ErrCodeProtocol)
+ }
+ return nil
+}
+
func (sc *serverConn) processPriority(f *PriorityFrame) error {
- adjustStreamPriority(sc.streams, f.StreamID, f.PriorityParam)
+ if sc.inGoAway {
+ return nil
+ }
+ if err := checkPriority(f.StreamID, f.PriorityParam); err != nil {
+ return err
+ }
+ sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
return nil
}
-func adjustStreamPriority(streams map[uint32]*stream, streamID uint32, priority PriorityParam) {
- st, ok := streams[streamID]
- if !ok {
- // TODO: not quite correct (this streamID might
- // already exist in the dep tree, but be closed), but
- // close enough for now.
- return
+func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
+ sc.serveG.check()
+ if id == 0 {
+ panic("internal error: cannot create stream with id 0")
}
- st.weight = priority.Weight
- parent := streams[priority.StreamDep] // might be nil
- if parent == st {
- // if client tries to set this stream to be the parent of itself
- // ignore and keep going
- return
+
+ ctx, cancelCtx := contextWithCancel(sc.baseCtx)
+ st := &stream{
+ sc: sc,
+ id: id,
+ state: state,
+ ctx: ctx,
+ cancelCtx: cancelCtx,
+ }
+ st.cw.Init()
+ st.flow.conn = &sc.flow // link to conn-level counter
+ st.flow.add(sc.initialStreamSendWindowSize)
+ st.inflow.conn = &sc.inflow // link to conn-level counter
+ st.inflow.add(sc.srv.initialStreamRecvWindowSize())
+ if sc.hs.WriteTimeout != 0 {
+ st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
}
- // section 5.3.3: If a stream is made dependent on one of its
- // own dependencies, the formerly dependent stream is first
- // moved to be dependent on the reprioritized stream's previous
- // parent. The moved dependency retains its weight.
- for piter := parent; piter != nil; piter = piter.parent {
- if piter == st {
- parent.parent = st.parent
- break
- }
+ sc.streams[id] = st
+ sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
+ if st.isPushed() {
+ sc.curPushedStreams++
+ } else {
+ sc.curClientStreams++
}
- st.parent = parent
- if priority.Exclusive && (st.parent != nil || priority.StreamDep == 0) {
- for _, openStream := range streams {
- if openStream != st && openStream.parent == st.parent {
- openStream.parent = st
- }
- }
+ if sc.curOpenStreams() == 1 {
+ sc.setConnState(http.StateActive)
}
+
+ return st
}
func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
sc.serveG.check()
- method := f.PseudoValue("method")
- path := f.PseudoValue("path")
- scheme := f.PseudoValue("scheme")
- authority := f.PseudoValue("authority")
+ rp := requestParam{
+ method: f.PseudoValue("method"),
+ scheme: f.PseudoValue("scheme"),
+ authority: f.PseudoValue("authority"),
+ path: f.PseudoValue("path"),
+ }
- isConnect := method == "CONNECT"
+ isConnect := rp.method == "CONNECT"
if isConnect {
- if path != "" || scheme != "" || authority == "" {
+ if rp.path != "" || rp.scheme != "" || rp.authority == "" {
return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
}
- } else if method == "" || path == "" ||
- (scheme != "https" && scheme != "http") {
+ } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
// See 8.1.2.6 Malformed Requests and Responses:
//
// Malformed requests or responses that are detected
@@ -1579,36 +1919,62 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
}
bodyOpen := !f.StreamEnded()
- if method == "HEAD" && bodyOpen {
+ if rp.method == "HEAD" && bodyOpen {
// HEAD requests can't have bodies
return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
}
- var tlsState *tls.ConnectionState // nil if not scheme https
- if scheme == "https" {
- tlsState = sc.tlsState
+ rp.header = make(http.Header)
+ for _, hf := range f.RegularFields() {
+ rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
+ }
+ if rp.authority == "" {
+ rp.authority = rp.header.Get("Host")
}
- header := make(http.Header)
- for _, hf := range f.RegularFields() {
- header.Add(sc.canonicalHeader(hf.Name), hf.Value)
+ rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
+ if err != nil {
+ return nil, nil, err
}
+ if bodyOpen {
+ if vv, ok := rp.header["Content-Length"]; ok {
+ req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
+ } else {
+ req.ContentLength = -1
+ }
+ req.Body.(*requestBody).pipe = &pipe{
+ b: &dataBuffer{expected: req.ContentLength},
+ }
+ }
+ return rw, req, nil
+}
+
+type requestParam struct {
+ method string
+ scheme, authority, path string
+ header http.Header
+}
- if authority == "" {
- authority = header.Get("Host")
+func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
+ sc.serveG.check()
+
+ var tlsState *tls.ConnectionState // nil if not scheme https
+ if rp.scheme == "https" {
+ tlsState = sc.tlsState
}
- needsContinue := header.Get("Expect") == "100-continue"
+
+ needsContinue := rp.header.Get("Expect") == "100-continue"
if needsContinue {
- header.Del("Expect")
+ rp.header.Del("Expect")
}
// Merge Cookie headers into one "; "-delimited value.
- if cookies := header["Cookie"]; len(cookies) > 1 {
- header.Set("Cookie", strings.Join(cookies, "; "))
+ if cookies := rp.header["Cookie"]; len(cookies) > 1 {
+ rp.header.Set("Cookie", strings.Join(cookies, "; "))
}
// Setup Trailers
var trailer http.Header
- for _, v := range header["Trailer"] {
+ for _, v := range rp.header["Trailer"] {
for _, key := range strings.Split(v, ",") {
key = http.CanonicalHeaderKey(strings.TrimSpace(key))
switch key {
@@ -1623,57 +1989,42 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
}
}
}
- delete(header, "Trailer")
+ delete(rp.header, "Trailer")
- body := &requestBody{
- conn: sc,
- stream: st,
- needsContinue: needsContinue,
- }
var url_ *url.URL
var requestURI string
- if isConnect {
- url_ = &url.URL{Host: authority}
- requestURI = authority // mimic HTTP/1 server behavior
+ if rp.method == "CONNECT" {
+ url_ = &url.URL{Host: rp.authority}
+ requestURI = rp.authority // mimic HTTP/1 server behavior
} else {
var err error
- url_, err = url.ParseRequestURI(path)
+ url_, err = url.ParseRequestURI(rp.path)
if err != nil {
- return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
+ return nil, nil, streamError(st.id, ErrCodeProtocol)
}
- requestURI = path
+ requestURI = rp.path
+ }
+
+ body := &requestBody{
+ conn: sc,
+ stream: st,
+ needsContinue: needsContinue,
}
req := &http.Request{
- Method: method,
+ Method: rp.method,
URL: url_,
RemoteAddr: sc.remoteAddrStr,
- Header: header,
+ Header: rp.header,
RequestURI: requestURI,
Proto: "HTTP/2.0",
ProtoMajor: 2,
ProtoMinor: 0,
TLS: tlsState,
- Host: authority,
+ Host: rp.authority,
Body: body,
Trailer: trailer,
}
req = requestWithContext(req, st.ctx)
- if bodyOpen {
- // Disabled, per golang.org/issue/14960:
- // st.reqBuf = sc.getRequestBodyBuf()
- // TODO: remove this 64k of garbage per request (again, but without a data race):
- buf := make([]byte, initialWindowSize)
-
- body.pipe = &pipe{
- b: &fixedBuffer{buf: buf},
- }
-
- if vv, ok := header["Content-Length"]; ok {
- req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
- } else {
- req.ContentLength = -1
- }
- }
rws := responseWriterStatePool.Get().(*responseWriterState)
bwSave := rws.bw
@@ -1689,15 +2040,6 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
return rw, req, nil
}
-func (sc *serverConn) getRequestBodyBuf() []byte {
- sc.serveG.check()
- if buf := sc.freeRequestBodyBuf; buf != nil {
- sc.freeRequestBodyBuf = nil
- return buf
- }
- return make([]byte, initialWindowSize)
-}
-
// Run on its own goroutine.
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
didPanic := true
@@ -1705,15 +2047,17 @@ func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler
rw.rws.stream.cancelCtx()
if didPanic {
e := recover()
- // Same as net/http:
- const size = 64 << 10
- buf := make([]byte, size)
- buf = buf[:runtime.Stack(buf, false)]
- sc.writeFrameFromHandler(frameWriteMsg{
+ sc.writeFrameFromHandler(FrameWriteRequest{
write: handlerPanicRST{rw.rws.stream.id},
stream: rw.rws.stream,
})
- sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
+ // Same as net/http:
+ if shouldLogPanic(e) {
+ const size = 64 << 10
+ buf := make([]byte, size)
+ buf = buf[:runtime.Stack(buf, false)]
+ sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
+ }
return
}
rw.handlerDone()
@@ -1744,7 +2088,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
// mutates it.
errc = errChanPool.Get().(chan error)
}
- if err := sc.writeFrameFromHandler(frameWriteMsg{
+ if err := sc.writeFrameFromHandler(FrameWriteRequest{
write: headerData,
stream: st,
done: errc,
@@ -1767,7 +2111,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
// called from handler goroutines.
func (sc *serverConn) write100ContinueHeaders(st *stream) {
- sc.writeFrameFromHandler(frameWriteMsg{
+ sc.writeFrameFromHandler(FrameWriteRequest{
write: write100ContinueHeadersFrame{st.id},
stream: st,
})
@@ -1783,11 +2127,13 @@ type bodyReadMsg struct {
// called from handler goroutines.
// Notes that the handler for the given stream ID read n bytes of its body
// and schedules flow control tokens to be sent.
-func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int) {
+func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
sc.serveG.checkNotOn() // NOT on
- select {
- case sc.bodyReadCh <- bodyReadMsg{st, n}:
- case <-sc.doneServing:
+ if n > 0 {
+ select {
+ case sc.bodyReadCh <- bodyReadMsg{st, n}:
+ case <-sc.doneServing:
+ }
}
}
@@ -1830,7 +2176,7 @@ func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
if st != nil {
streamID = st.id
}
- sc.writeFrame(frameWriteMsg{
+ sc.writeFrame(FrameWriteRequest{
write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
stream: st,
})
@@ -1845,16 +2191,19 @@ func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
}
}
+// requestBody is the Handler's Request.Body type.
+// Read and Close may be called concurrently.
type requestBody struct {
stream *stream
conn *serverConn
- closed bool
+ closed bool // for use by Close only
+ sawEOF bool // for use by Read only
pipe *pipe // non-nil if we have a HTTP entity message body
needsContinue bool // need to send a 100-continue
}
func (b *requestBody) Close() error {
- if b.pipe != nil {
+ if b.pipe != nil && !b.closed {
b.pipe.BreakWithError(errClosedBody)
}
b.closed = true
@@ -1866,18 +2215,22 @@ func (b *requestBody) Read(p []byte) (n int, err error) {
b.needsContinue = false
b.conn.write100ContinueHeaders(b.stream)
}
- if b.pipe == nil {
+ if b.pipe == nil || b.sawEOF {
return 0, io.EOF
}
n, err = b.pipe.Read(p)
- if n > 0 {
- b.conn.noteBodyReadFromHandler(b.stream, n)
+ if err == io.EOF {
+ b.sawEOF = true
}
+ if b.conn == nil && inTests {
+ return
+ }
+ b.conn.noteBodyReadFromHandler(b.stream, n, err)
return
}
-// responseWriter is the http.ResponseWriter implementation. It's
-// intentionally small (1 pointer wide) to minimize garbage. The
+// responseWriter is the http.ResponseWriter implementation. It's
+// intentionally small (1 pointer wide) to minimize garbage. The
// responseWriterState pointer inside is zeroed at the end of a
// request (in handlerDone) and calls on the responseWriter thereafter
// simply crash (caller's mistake), but the much larger responseWriterState
@@ -1911,6 +2264,7 @@ type responseWriterState struct {
wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
sentHeader bool // have we sent the header frame?
handlerDone bool // handler has finished
+ dirty bool // a Write failed; don't reuse this responseWriterState
sentContentLen int64 // non-zero if handler set a Content-Length header
wroteBytes int64
@@ -1931,7 +2285,7 @@ func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) !=
func (rws *responseWriterState) declareTrailer(k string) {
k = http.CanonicalHeaderKey(k)
if !ValidTrailerHeader(k) {
- // Forbidden by RFC 2616 14.40.
+ // Forbidden by RFC 7230, section 4.1.2.
rws.conn.logf("ignoring invalid trailer %q", k)
return
}
@@ -1968,7 +2322,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
clen = strconv.Itoa(len(p))
}
_, hasContentType := rws.snapHeader["Content-Type"]
- if !hasContentType && bodyAllowedForStatus(rws.status) {
+ if !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 {
ctype = http.DetectContentType(p)
}
var date string
@@ -1992,6 +2346,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
date: date,
})
if err != nil {
+ rws.dirty = true
return 0, err
}
if endStream {
@@ -2013,6 +2368,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
if len(p) > 0 || endStream {
// only send a 0 byte DATA frame if we're ending the stream.
if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
+ rws.dirty = true
return 0, err
}
}
@@ -2024,6 +2380,9 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
trailers: rws.trailers,
endStream: true,
})
+ if err != nil {
+ rws.dirty = true
+ }
return len(p), err
}
return len(p), nil
@@ -2047,11 +2406,11 @@ const TrailerPrefix = "Trailer:"
// after the header has already been flushed. Because the Go
// ResponseWriter interface has no way to set Trailers (only the
// Header), and because we didn't want to expand the ResponseWriter
-// interface, and because nobody used trailers, and because RFC 2616
+// interface, and because nobody used trailers, and because RFC 7230
// says you SHOULD (but not must) predeclare any trailers in the
// header, the official ResponseWriter rules said trailers in Go must
// be predeclared, and then we reuse the same ResponseWriter.Header()
-// map to mean both Headers and Trailers. When it's time to write the
+// map to mean both Headers and Trailers. When it's time to write the
// Trailers, we pick out the fields of Headers that were declared as
// trailers. That worked for a while, until we found the first major
// user of Trailers in the wild: gRPC (using them only over http2),
@@ -2110,8 +2469,9 @@ func (w *responseWriter) CloseNotify() <-chan bool {
if ch == nil {
ch = make(chan bool, 1)
rws.closeNotifierCh = ch
+ cw := rws.stream.cw
go func() {
- rws.stream.cw.Wait() // wait for close
+ cw.Wait() // wait for close
ch <- true
}()
}
@@ -2130,6 +2490,24 @@ func (w *responseWriter) Header() http.Header {
return rws.handlerHeader
}
+// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode.
+func checkWriteHeaderCode(code int) {
+ // Issue 22880: require valid WriteHeader status codes.
+ // For now we only enforce that it's three digits.
+ // In the future we might block things over 599 (600 and above aren't defined
+ // at http://httpwg.org/specs/rfc7231.html#status.codes)
+ // and we might block under 200 (once we have more mature 1xx support).
+ // But for now any three digits.
+ //
+ // We used to send "HTTP/1.1 000 0" on the wire in responses but there's
+ // no equivalent bogus thing we can realistically send in HTTP/2,
+ // so we'll consistently panic instead and help people find their bugs
+ // early. (We can't return an error from WriteHeader even if we wanted to.)
+ if code < 100 || code > 999 {
+ panic(fmt.Sprintf("invalid WriteHeader code %v", code))
+ }
+}
+
func (w *responseWriter) WriteHeader(code int) {
rws := w.rws
if rws == nil {
@@ -2140,6 +2518,7 @@ func (w *responseWriter) WriteHeader(code int) {
func (rws *responseWriterState) writeHeader(code int) {
if !rws.wroteHeader {
+ checkWriteHeaderCode(code)
rws.wroteHeader = true
rws.status = code
if len(rws.handlerHeader) > 0 {
@@ -2162,7 +2541,7 @@ func cloneHeader(h http.Header) http.Header {
//
// * Handler calls w.Write or w.WriteString ->
// * -> rws.bw (*bufio.Writer) ->
-// * (Handler migth call Flush)
+// * (Handler might call Flush)
// * -> chunkWriter{rws}
// * -> responseWriterState.writeChunk(p []byte)
// * -> responseWriterState.writeChunk (most of the magic; see comment there)
@@ -2201,14 +2580,217 @@ func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int,
func (w *responseWriter) handlerDone() {
rws := w.rws
+ dirty := rws.dirty
rws.handlerDone = true
w.Flush()
w.rws = nil
- responseWriterStatePool.Put(rws)
+ if !dirty {
+ // Only recycle the pool if all prior Write calls to
+ // the serverConn goroutine completed successfully. If
+ // they returned earlier due to resets from the peer
+ // there might still be write goroutines outstanding
+ // from the serverConn referencing the rws memory. See
+ // issue 20704.
+ responseWriterStatePool.Put(rws)
+ }
+}
+
+// Push errors.
+var (
+ ErrRecursivePush = errors.New("http2: recursive push not allowed")
+ ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
+)
+
+// pushOptions is the internal version of http.PushOptions, which we
+// cannot include here because it's only defined in Go 1.8 and later.
+type pushOptions struct {
+ Method string
+ Header http.Header
+}
+
+func (w *responseWriter) push(target string, opts pushOptions) error {
+ st := w.rws.stream
+ sc := st.sc
+ sc.serveG.checkNotOn()
+
+ // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
+ // http://tools.ietf.org/html/rfc7540#section-6.6
+ if st.isPushed() {
+ return ErrRecursivePush
+ }
+
+ // Default options.
+ if opts.Method == "" {
+ opts.Method = "GET"
+ }
+ if opts.Header == nil {
+ opts.Header = http.Header{}
+ }
+ wantScheme := "http"
+ if w.rws.req.TLS != nil {
+ wantScheme = "https"
+ }
+
+ // Validate the request.
+ u, err := url.Parse(target)
+ if err != nil {
+ return err
+ }
+ if u.Scheme == "" {
+ if !strings.HasPrefix(target, "/") {
+ return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
+ }
+ u.Scheme = wantScheme
+ u.Host = w.rws.req.Host
+ } else {
+ if u.Scheme != wantScheme {
+ return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
+ }
+ if u.Host == "" {
+ return errors.New("URL must have a host")
+ }
+ }
+ for k := range opts.Header {
+ if strings.HasPrefix(k, ":") {
+ return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
+ }
+ // These headers are meaningful only if the request has a body,
+ // but PUSH_PROMISE requests cannot have a body.
+ // http://tools.ietf.org/html/rfc7540#section-8.2
+ // Also disallow Host, since the promised URL must be absolute.
+ switch strings.ToLower(k) {
+ case "content-length", "content-encoding", "trailer", "te", "expect", "host":
+ return fmt.Errorf("promised request headers cannot include %q", k)
+ }
+ }
+ if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
+ return err
+ }
+
+ // The RFC effectively limits promised requests to GET and HEAD:
+ // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
+ // http://tools.ietf.org/html/rfc7540#section-8.2
+ if opts.Method != "GET" && opts.Method != "HEAD" {
+ return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
+ }
+
+ msg := &startPushRequest{
+ parent: st,
+ method: opts.Method,
+ url: u,
+ header: cloneHeader(opts.Header),
+ done: errChanPool.Get().(chan error),
+ }
+
+ select {
+ case <-sc.doneServing:
+ return errClientDisconnected
+ case <-st.cw:
+ return errStreamClosed
+ case sc.serveMsgCh <- msg:
+ }
+
+ select {
+ case <-sc.doneServing:
+ return errClientDisconnected
+ case <-st.cw:
+ return errStreamClosed
+ case err := <-msg.done:
+ errChanPool.Put(msg.done)
+ return err
+ }
+}
+
+type startPushRequest struct {
+ parent *stream
+ method string
+ url *url.URL
+ header http.Header
+ done chan error
+}
+
+func (sc *serverConn) startPush(msg *startPushRequest) {
+ sc.serveG.check()
+
+ // http://tools.ietf.org/html/rfc7540#section-6.6.
+ // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
+ // is in either the "open" or "half-closed (remote)" state.
+ if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
+ // responseWriter.Push checks that the stream is peer-initiaed.
+ msg.done <- errStreamClosed
+ return
+ }
+
+ // http://tools.ietf.org/html/rfc7540#section-6.6.
+ if !sc.pushEnabled {
+ msg.done <- http.ErrNotSupported
+ return
+ }
+
+ // PUSH_PROMISE frames must be sent in increasing order by stream ID, so
+ // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
+ // is written. Once the ID is allocated, we start the request handler.
+ allocatePromisedID := func() (uint32, error) {
+ sc.serveG.check()
+
+ // Check this again, just in case. Technically, we might have received
+ // an updated SETTINGS by the time we got around to writing this frame.
+ if !sc.pushEnabled {
+ return 0, http.ErrNotSupported
+ }
+ // http://tools.ietf.org/html/rfc7540#section-6.5.2.
+ if sc.curPushedStreams+1 > sc.clientMaxStreams {
+ return 0, ErrPushLimitReached
+ }
+
+ // http://tools.ietf.org/html/rfc7540#section-5.1.1.
+ // Streams initiated by the server MUST use even-numbered identifiers.
+ // A server that is unable to establish a new stream identifier can send a GOAWAY
+ // frame so that the client is forced to open a new connection for new streams.
+ if sc.maxPushPromiseID+2 >= 1<<31 {
+ sc.startGracefulShutdownInternal()
+ return 0, ErrPushLimitReached
+ }
+ sc.maxPushPromiseID += 2
+ promisedID := sc.maxPushPromiseID
+
+ // http://tools.ietf.org/html/rfc7540#section-8.2.
+ // Strictly speaking, the new stream should start in "reserved (local)", then
+ // transition to "half closed (remote)" after sending the initial HEADERS, but
+ // we start in "half closed (remote)" for simplicity.
+ // See further comments at the definition of stateHalfClosedRemote.
+ promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
+ rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
+ method: msg.method,
+ scheme: msg.url.Scheme,
+ authority: msg.url.Host,
+ path: msg.url.RequestURI(),
+ header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
+ })
+ if err != nil {
+ // Should not happen, since we've already validated msg.url.
+ panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
+ }
+
+ go sc.runHandler(rw, req, sc.handler.ServeHTTP)
+ return promisedID, nil
+ }
+
+ sc.writeFrame(FrameWriteRequest{
+ write: &writePushPromise{
+ streamID: msg.parent.id,
+ method: msg.method,
+ url: msg.url,
+ h: msg.header,
+ allocatePromisedID: allocatePromisedID,
+ },
+ stream: msg.parent,
+ done: msg.done,
+ })
}
// foreachHeaderElement splits v according to the "#rule" construction
-// in RFC 2616 section 2.1 and calls fn for each non-empty element.
+// in RFC 7230 section 7 and calls fn for each non-empty element.
func foreachHeaderElement(v string, fn func(string)) {
v = textproto.TrimString(v)
if v == "" {
@@ -2234,16 +2816,16 @@ var connHeaders = []string{
"Upgrade",
}
-// checkValidHTTP2Request checks whether req is a valid HTTP/2 request,
+// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
// per RFC 7540 Section 8.1.2.2.
// The returned error is reported to users.
-func checkValidHTTP2Request(req *http.Request) error {
- for _, h := range connHeaders {
- if _, ok := req.Header[h]; ok {
- return fmt.Errorf("request header %q is not valid in HTTP/2", h)
+func checkValidHTTP2RequestHeaders(h http.Header) error {
+ for _, k := range connHeaders {
+ if _, ok := h[k]; ok {
+ return fmt.Errorf("request header %q is not valid in HTTP/2", k)
}
}
- te := req.Header["Te"]
+ te := h["Te"]
if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
}
@@ -2290,3 +2872,17 @@ var badTrailer = map[string]bool{
"Transfer-Encoding": true,
"Www-Authenticate": true,
}
+
+// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
+// disabled. See comments on h1ServerShutdownChan above for why
+// the code is written this way.
+func h1ServerKeepAlivesDisabled(hs *http.Server) bool {
+ var x interface{} = hs
+ type I interface {
+ doKeepAlives() bool
+ }
+ if hs, ok := x.(I); ok {
+ return !hs.doKeepAlives()
+ }
+ return false
+}