Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-pages.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Thomas <nick@gitlab.com>2018-10-24 05:11:29 +0300
committerNick Thomas <nick@gitlab.com>2018-10-26 23:09:50 +0300
commit513a357961b1693c1d3f8450430d72b2d1d354be (patch)
treebc186e85fecb069f08681dcc48d683c9514306f1 /internal/netutil
parent49cc251dafd31762dd9eca096a9eba963c469a26 (diff)
Allow the maximum connection concurrency to be specified
Diffstat (limited to 'internal/netutil')
-rw-r--r--internal/netutil/shared_limit_listener.go113
1 files changed, 113 insertions, 0 deletions
diff --git a/internal/netutil/shared_limit_listener.go b/internal/netutil/shared_limit_listener.go
new file mode 100644
index 00000000..3f88e591
--- /dev/null
+++ b/internal/netutil/shared_limit_listener.go
@@ -0,0 +1,113 @@
+package netutil
+
+import (
+ "errors"
+ "net"
+ "sync"
+ "time"
+)
+
+var (
+ errKeepaliveNotSupported = errors.New("Keepalive not supported")
+)
+
+// SharedLimitListener returns a Listener that accepts simultaneous
+// connections from the provided Listener only if a shared availability pool
+// permits it. Based on https://godoc.org/golang.org/x/net/netutil
+func SharedLimitListener(listener net.Listener, limiter *Limiter) net.Listener {
+ return &sharedLimitListener{
+ Listener: listener,
+ limiter: limiter,
+ done: make(chan struct{}),
+ }
+}
+
+// Limiter is used to provide a shared pool of connection slots. Use NewLimiter
+// to create an instance
+type Limiter struct {
+ sem chan struct{}
+}
+
+// NewLimiter creates a Limiter with the given capacity
+func NewLimiter(n int) *Limiter {
+ return &Limiter{
+ sem: make(chan struct{}, n),
+ }
+}
+
+type sharedLimitListener struct {
+ net.Listener
+ closeOnce sync.Once // ensures the done chan is only closed once
+ limiter *Limiter // A pool of connection slots shared with other listeners
+ done chan struct{} // no values sent; closed when Close is called
+}
+
+// acquire acquires the limiting semaphore. Returns true if successfully
+// accquired, false if the listener is closed and the semaphore is not
+// acquired.
+func (l *sharedLimitListener) acquire() bool {
+ select {
+ case <-l.done:
+ return false
+ case l.limiter.sem <- struct{}{}:
+ return true
+ }
+}
+func (l *sharedLimitListener) release() { <-l.limiter.sem }
+
+func (l *sharedLimitListener) Accept() (net.Conn, error) {
+ acquired := l.acquire()
+ // If the semaphore isn't acquired because the listener was closed, expect
+ // that this call to accept won't block, but immediately return an error.
+ c, err := l.Listener.Accept()
+ if err != nil {
+ if acquired {
+ l.release()
+ }
+ return nil, err
+ }
+
+ // Support TCP Keepalive operations if possible
+ tcpConn, _ := c.(*net.TCPConn)
+
+ return &sharedLimitListenerConn{
+ Conn: c,
+ tcpConn: tcpConn,
+ release: l.release,
+ }, nil
+}
+
+func (l *sharedLimitListener) Close() error {
+ err := l.Listener.Close()
+ l.closeOnce.Do(func() { close(l.done) })
+ return err
+}
+
+type sharedLimitListenerConn struct {
+ net.Conn
+ tcpConn *net.TCPConn
+ releaseOnce sync.Once
+ release func()
+}
+
+func (c *sharedLimitListenerConn) Close() error {
+ err := c.Conn.Close()
+ c.releaseOnce.Do(c.release)
+ return err
+}
+
+func (c *sharedLimitListenerConn) SetKeepAlive(enabled bool) error {
+ if c.tcpConn == nil {
+ return errKeepaliveNotSupported
+ }
+
+ return c.tcpConn.SetKeepAlive(enabled)
+}
+
+func (c *sharedLimitListenerConn) SetKeepAlivePeriod(period time.Duration) error {
+ if c.tcpConn == nil {
+ return errKeepaliveNotSupported
+ }
+
+ return c.tcpConn.SetKeepAlivePeriod(period)
+}