Welcome to mirror list, hosted at ThFree Co, Russian Federation.

shared_limit_listener.go « netutil « internal - gitlab.com/gitlab-org/gitlab-pages.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: c4c669639a2510fad90c99abf091fd47dd65bddb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package netutil

import (
	"errors"
	"net"
	"sync"
	"time"

	"github.com/prometheus/client_golang/prometheus"
)

var (
	errKeepaliveNotSupported = errors.New("keepalive not supported")
)

// SharedLimitListener returns a Listener that accepts simultaneous
// connections from the provided Listener only if a shared availability pool
// permits it. Based on https://godoc.org/golang.org/x/net/netutil
func SharedLimitListener(listener net.Listener, limiter *Limiter) net.Listener {
	return &sharedLimitListener{
		Listener: listener,
		limiter:  limiter,
		done:     make(chan struct{}),
	}
}

// Limiter is used to provide a shared pool of connection slots. Use NewLimiter
// to create an instance
type Limiter struct {
	sem                  chan struct{}
	concurrentConnsCount prometheus.Gauge
	waitingConnsCount    prometheus.Gauge
}

// NewLimiterWithMetrics creates a Limiter with metrics
func NewLimiterWithMetrics(n int, maxConnsCount, concurrentConnsCount, waitingConnsCount prometheus.Gauge) *Limiter {
	maxConnsCount.Set(float64(n))

	return &Limiter{
		sem:                  make(chan struct{}, n),
		concurrentConnsCount: concurrentConnsCount,
		waitingConnsCount:    waitingConnsCount,
	}
}

type sharedLimitListener struct {
	net.Listener
	closeOnce sync.Once     // ensures the done chan is only closed once
	limiter   *Limiter      // A pool of connection slots shared with other listeners
	done      chan struct{} // no values sent; closed when Close is called
}

// acquire acquires the limiting semaphore. Returns true if successfully
// accquired, false if the listener is closed and the semaphore is not
// acquired.
func (l *sharedLimitListener) acquire() bool {
	l.limiter.waitingConnsCount.Inc()
	defer l.limiter.waitingConnsCount.Dec()

	select {
	case <-l.done:
		return false
	case l.limiter.sem <- struct{}{}:
		l.limiter.concurrentConnsCount.Inc()
		return true
	}
}
func (l *sharedLimitListener) release() {
	<-l.limiter.sem
	l.limiter.concurrentConnsCount.Dec()
}

func (l *sharedLimitListener) Accept() (net.Conn, error) {
	acquired := l.acquire()
	// If the semaphore isn't acquired because the listener was closed, expect
	// that this call to accept won't block, but immediately return an error.
	c, err := l.Listener.Accept()
	if err != nil {
		if acquired {
			l.release()
		}
		return nil, err
	}

	// Support TCP Keepalive operations if possible
	tcpConn, _ := c.(*net.TCPConn)

	return &sharedLimitListenerConn{
		Conn:    c,
		tcpConn: tcpConn,
		release: l.release,
	}, nil
}

func (l *sharedLimitListener) Close() error {
	err := l.Listener.Close()
	l.closeOnce.Do(func() { close(l.done) })
	return err
}

type sharedLimitListenerConn struct {
	net.Conn
	tcpConn     *net.TCPConn
	releaseOnce sync.Once
	release     func()
}

func (c *sharedLimitListenerConn) Close() error {
	err := c.Conn.Close()
	c.releaseOnce.Do(c.release)
	return err
}

func (c *sharedLimitListenerConn) SetKeepAlive(enabled bool) error {
	if c.tcpConn == nil {
		return errKeepaliveNotSupported
	}

	return c.tcpConn.SetKeepAlive(enabled)
}

func (c *sharedLimitListenerConn) SetKeepAlivePeriod(period time.Duration) error {
	if c.tcpConn == nil {
		return errKeepaliveNotSupported
	}

	return c.tcpConn.SetKeepAlivePeriod(period)
}