Welcome to mirror list, hosted at ThFree Co, Russian Federation.

shared_limit_listener.go « netutil « internal - gitlab.com/gitlab-org/gitlab-pages.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 3f88e5918f7c27effa60950cb407152f0d9cc0a4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package netutil

import (
	"errors"
	"net"
	"sync"
	"time"
)

var (
	errKeepaliveNotSupported = errors.New("Keepalive not supported")
)

// SharedLimitListener returns a Listener that accepts simultaneous
// connections from the provided Listener only if a shared availability pool
// permits it. Based on https://godoc.org/golang.org/x/net/netutil
func SharedLimitListener(listener net.Listener, limiter *Limiter) net.Listener {
	return &sharedLimitListener{
		Listener: listener,
		limiter:  limiter,
		done:     make(chan struct{}),
	}
}

// Limiter is used to provide a shared pool of connection slots. Use NewLimiter
// to create an instance
type Limiter struct {
	sem chan struct{}
}

// NewLimiter creates a Limiter with the given capacity
func NewLimiter(n int) *Limiter {
	return &Limiter{
		sem: make(chan struct{}, n),
	}
}

type sharedLimitListener struct {
	net.Listener
	closeOnce sync.Once     // ensures the done chan is only closed once
	limiter   *Limiter      // A pool of connection slots shared with other listeners
	done      chan struct{} // no values sent; closed when Close is called
}

// acquire acquires the limiting semaphore. Returns true if successfully
// accquired, false if the listener is closed and the semaphore is not
// acquired.
func (l *sharedLimitListener) acquire() bool {
	select {
	case <-l.done:
		return false
	case l.limiter.sem <- struct{}{}:
		return true
	}
}
func (l *sharedLimitListener) release() { <-l.limiter.sem }

func (l *sharedLimitListener) Accept() (net.Conn, error) {
	acquired := l.acquire()
	// If the semaphore isn't acquired because the listener was closed, expect
	// that this call to accept won't block, but immediately return an error.
	c, err := l.Listener.Accept()
	if err != nil {
		if acquired {
			l.release()
		}
		return nil, err
	}

	// Support TCP Keepalive operations if possible
	tcpConn, _ := c.(*net.TCPConn)

	return &sharedLimitListenerConn{
		Conn:    c,
		tcpConn: tcpConn,
		release: l.release,
	}, nil
}

func (l *sharedLimitListener) Close() error {
	err := l.Listener.Close()
	l.closeOnce.Do(func() { close(l.done) })
	return err
}

type sharedLimitListenerConn struct {
	net.Conn
	tcpConn     *net.TCPConn
	releaseOnce sync.Once
	release     func()
}

func (c *sharedLimitListenerConn) Close() error {
	err := c.Conn.Close()
	c.releaseOnce.Do(c.release)
	return err
}

func (c *sharedLimitListenerConn) SetKeepAlive(enabled bool) error {
	if c.tcpConn == nil {
		return errKeepaliveNotSupported
	}

	return c.tcpConn.SetKeepAlive(enabled)
}

func (c *sharedLimitListenerConn) SetKeepAlivePeriod(period time.Duration) error {
	if c.tcpConn == nil {
		return errKeepaliveNotSupported
	}

	return c.tcpConn.SetKeepAlivePeriod(period)
}