diff options
author | Alessio Caiazza <acaiazza@gitlab.com> | 2018-10-29 11:10:09 +0300 |
---|---|---|
committer | Alessio Caiazza <acaiazza@gitlab.com> | 2018-10-29 11:10:09 +0300 |
commit | e37801aeeb82ed66bd143cb286258566d3391c53 (patch) | |
tree | e49efcf1e859162ee5be50168af63db4bb132d91 | |
parent | dedadb37ddd3e3dd5a1f3ac9ea5496c602556443 (diff) | |
parent | 513a357961b1693c1d3f8450430d72b2d1d354be (diff) |
Merge branch '172-runtime-thread-limit' into 'master'
Allow the maximum connection concurrency to be set
See merge request gitlab-org/gitlab-pages!117
-rw-r--r-- | app.go | 11 | ||||
-rw-r--r-- | app_config.go | 1 | ||||
-rw-r--r-- | internal/netutil/shared_limit_listener.go | 113 | ||||
-rw-r--r-- | main.go | 2 | ||||
-rw-r--r-- | server.go | 40 |
5 files changed, 150 insertions, 17 deletions
@@ -22,6 +22,7 @@ import ( "gitlab.com/gitlab-org/gitlab-pages/internal/auth" "gitlab.com/gitlab-org/gitlab-pages/internal/domain" "gitlab.com/gitlab-org/gitlab-pages/internal/httperrors" + "gitlab.com/gitlab-org/gitlab-pages/internal/netutil" "gitlab.com/gitlab-org/gitlab-pages/metrics" ) @@ -243,12 +244,14 @@ func (a *theApp) UpdateDomains(dm domain.Map) { func (a *theApp) Run() { var wg sync.WaitGroup + limiter := netutil.NewLimiter(a.MaxConns) + // Listen for HTTP for _, fd := range a.ListenHTTP { wg.Add(1) go func(fd uintptr) { defer wg.Done() - err := listenAndServe(fd, a.ServeHTTP, a.HTTP2, nil) + err := listenAndServe(fd, a.ServeHTTP, a.HTTP2, nil, limiter) if err != nil { fatal(err) } @@ -260,7 +263,7 @@ func (a *theApp) Run() { wg.Add(1) go func(fd uintptr) { defer wg.Done() - err := listenAndServeTLS(fd, a.RootCertificate, a.RootKey, a.ServeHTTP, a.ServeTLS, a.HTTP2) + err := listenAndServeTLS(fd, a.RootCertificate, a.RootKey, a.ServeHTTP, a.ServeTLS, a.HTTP2, limiter) if err != nil { fatal(err) } @@ -272,7 +275,7 @@ func (a *theApp) Run() { wg.Add(1) go func(fd uintptr) { defer wg.Done() - err := listenAndServe(fd, a.ServeProxy, a.HTTP2, nil) + err := listenAndServe(fd, a.ServeProxy, a.HTTP2, nil, limiter) if err != nil { fatal(err) } @@ -286,7 +289,7 @@ func (a *theApp) Run() { defer wg.Done() handler := promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}).ServeHTTP - err := listenAndServe(fd, handler, false, nil) + err := listenAndServe(fd, handler, false, nil, nil) if err != nil { fatal(err) } diff --git a/app_config.go b/app_config.go index ab8cc264..9ff26b6b 100644 --- a/app_config.go +++ b/app_config.go @@ -9,6 +9,7 @@ type appConfig struct { AdminCertificate []byte AdminKey []byte AdminToken []byte + MaxConns int ListenHTTP []uintptr ListenHTTPS []uintptr diff --git a/internal/netutil/shared_limit_listener.go b/internal/netutil/shared_limit_listener.go new file mode 100644 index 00000000..3f88e591 --- /dev/null +++ b/internal/netutil/shared_limit_listener.go @@ -0,0 +1,113 @@ +package netutil + +import ( + "errors" + "net" + "sync" + "time" +) + +var ( + errKeepaliveNotSupported = errors.New("Keepalive not supported") +) + +// SharedLimitListener returns a Listener that accepts simultaneous +// connections from the provided Listener only if a shared availability pool +// permits it. Based on https://godoc.org/golang.org/x/net/netutil +func SharedLimitListener(listener net.Listener, limiter *Limiter) net.Listener { + return &sharedLimitListener{ + Listener: listener, + limiter: limiter, + done: make(chan struct{}), + } +} + +// Limiter is used to provide a shared pool of connection slots. Use NewLimiter +// to create an instance +type Limiter struct { + sem chan struct{} +} + +// NewLimiter creates a Limiter with the given capacity +func NewLimiter(n int) *Limiter { + return &Limiter{ + sem: make(chan struct{}, n), + } +} + +type sharedLimitListener struct { + net.Listener + closeOnce sync.Once // ensures the done chan is only closed once + limiter *Limiter // A pool of connection slots shared with other listeners + done chan struct{} // no values sent; closed when Close is called +} + +// acquire acquires the limiting semaphore. Returns true if successfully +// accquired, false if the listener is closed and the semaphore is not +// acquired. +func (l *sharedLimitListener) acquire() bool { + select { + case <-l.done: + return false + case l.limiter.sem <- struct{}{}: + return true + } +} +func (l *sharedLimitListener) release() { <-l.limiter.sem } + +func (l *sharedLimitListener) Accept() (net.Conn, error) { + acquired := l.acquire() + // If the semaphore isn't acquired because the listener was closed, expect + // that this call to accept won't block, but immediately return an error. + c, err := l.Listener.Accept() + if err != nil { + if acquired { + l.release() + } + return nil, err + } + + // Support TCP Keepalive operations if possible + tcpConn, _ := c.(*net.TCPConn) + + return &sharedLimitListenerConn{ + Conn: c, + tcpConn: tcpConn, + release: l.release, + }, nil +} + +func (l *sharedLimitListener) Close() error { + err := l.Listener.Close() + l.closeOnce.Do(func() { close(l.done) }) + return err +} + +type sharedLimitListenerConn struct { + net.Conn + tcpConn *net.TCPConn + releaseOnce sync.Once + release func() +} + +func (c *sharedLimitListenerConn) Close() error { + err := c.Conn.Close() + c.releaseOnce.Do(c.release) + return err +} + +func (c *sharedLimitListenerConn) SetKeepAlive(enabled bool) error { + if c.tcpConn == nil { + return errKeepaliveNotSupported + } + + return c.tcpConn.SetKeepAlive(enabled) +} + +func (c *sharedLimitListenerConn) SetKeepAlivePeriod(period time.Duration) error { + if c.tcpConn == nil { + return errKeepaliveNotSupported + } + + return c.tcpConn.SetKeepAlivePeriod(period) +} @@ -50,6 +50,7 @@ var ( clientID = flag.String("auth-client-id", "", "GitLab application Client ID") clientSecret = flag.String("auth-client-secret", "", "GitLab application Client Secret") redirectURI = flag.String("auth-redirect-uri", "", "GitLab application redirect URI") + maxConns = flag.Uint("max-conns", 5000, "Limit on the number of concurrent connections to the HTTP, HTTPS or proxy listeners") disableCrossOriginRequests = flag.Bool("disable-cross-origin-requests", false, "Disable cross-origin requests") @@ -80,6 +81,7 @@ func configFromFlags() appConfig { config.StatusPath = *pagesStatus config.LogFormat = *logFormat config.LogVerbose = *logVerbose + config.MaxConns = int(*maxConns) for _, file := range []struct { contents *[]byte @@ -10,25 +10,35 @@ import ( "github.com/gorilla/context" "golang.org/x/net/http2" + + "gitlab.com/gitlab-org/gitlab-pages/internal/netutil" ) type tlsHandlerFunc func(*tls.ClientHelloInfo) (*tls.Certificate, error) -type tcpKeepAliveListener struct { - *net.TCPListener +type keepAliveListener struct { + net.Listener +} + +type keepAliveSetter interface { + SetKeepAlive(bool) error + SetKeepAlivePeriod(time.Duration) error } -func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { - tc, err := ln.AcceptTCP() +func (ln *keepAliveListener) Accept() (net.Conn, error) { + conn, err := ln.Listener.Accept() if err != nil { - return + return nil, err } - tc.SetKeepAlive(true) - tc.SetKeepAlivePeriod(3 * time.Minute) - return tc, nil + + kc := conn.(keepAliveSetter) + kc.SetKeepAlive(true) + kc.SetKeepAlivePeriod(3 * time.Minute) + + return conn, nil } -func listenAndServe(fd uintptr, handler http.HandlerFunc, useHTTP2 bool, tlsConfig *tls.Config) error { +func listenAndServe(fd uintptr, handler http.HandlerFunc, useHTTP2 bool, tlsConfig *tls.Config, limiter *netutil.Limiter) error { // create server server := &http.Server{Handler: context.ClearHandler(handler), TLSConfig: tlsConfig} @@ -44,14 +54,18 @@ func listenAndServe(fd uintptr, handler http.HandlerFunc, useHTTP2 bool, tlsConf return fmt.Errorf("failed to listen on FD %d: %v", fd, err) } + if limiter != nil { + l = netutil.SharedLimitListener(l, limiter) + } + if tlsConfig != nil { - tlsListener := tls.NewListener(tcpKeepAliveListener{l.(*net.TCPListener)}, server.TLSConfig) + tlsListener := tls.NewListener(&keepAliveListener{l}, server.TLSConfig) return server.Serve(tlsListener) } - return server.Serve(&tcpKeepAliveListener{l.(*net.TCPListener)}) + return server.Serve(&keepAliveListener{l}) } -func listenAndServeTLS(fd uintptr, cert, key []byte, handler http.HandlerFunc, tlsHandler tlsHandlerFunc, useHTTP2 bool) error { +func listenAndServeTLS(fd uintptr, cert, key []byte, handler http.HandlerFunc, tlsHandler tlsHandlerFunc, useHTTP2 bool, limiter *netutil.Limiter) error { certificate, err := tls.X509KeyPair(cert, key) if err != nil { return err @@ -62,5 +76,5 @@ func listenAndServeTLS(fd uintptr, cert, key []byte, handler http.HandlerFunc, t tlsConfig.Certificates = []tls.Certificate{ certificate, } - return listenAndServe(fd, handler, useHTTP2, tlsConfig) + return listenAndServe(fd, handler, useHTTP2, tlsConfig, limiter) } |