Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-10-02 18:47:43 +0300
committerToon Claes <toon@gitlab.com>2020-10-02 18:47:43 +0300
commitb9dd1f6d4868f8463f689cf74f438e0108691af2 (patch)
treee223460f5a250d9a5fe28785835314d2b487a846
parentd76d10b540d474d950103aa3303a89a9aad9dd88 (diff)
PostgreSQL notifications listener
Praefect strongly depends on the state of tha PostgreSQL database. Usually we execute a query to get the latest infornation about the state. But it doesn't scale well in case query is heavy or there are too many round trips to the database. To address it the data could be cached locally in the memory, so we can reduce amount of trips to the database. The downside of this approach is that we should invalidate local cache when data in database changes. To address this we decided to allow Praefect to have a direct connection to the PostgeSQL database and benefit from LISTEN/NOTIFY functionality of it. Implemented listener triggers passed in callback function with payload it receives from the database. In order to prevent stale state if connection was lost and it can't be re-establish for 3 attempts in period of 1 min the listener will return with error. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3053
-rw-r--r--changelogs/unreleased/ps-postgres-listener.yml5
-rw-r--r--go.mod1
-rw-r--r--internal/praefect/datastore/glsql/postgres.go14
-rw-r--r--internal/praefect/datastore/init_test.go7
-rw-r--r--internal/praefect/datastore/listener_postgres.go248
-rw-r--r--internal/praefect/datastore/listener_postgres_test.go402
6 files changed, 675 insertions, 2 deletions
diff --git a/changelogs/unreleased/ps-postgres-listener.yml b/changelogs/unreleased/ps-postgres-listener.yml
new file mode 100644
index 000000000..38ce4a36b
--- /dev/null
+++ b/changelogs/unreleased/ps-postgres-listener.yml
@@ -0,0 +1,5 @@
+---
+title: PostgreSQL notifications listener
+merge_request: 2603
+author:
+type: added
diff --git a/go.mod b/go.mod
index 87e5732db..651af3047 100644
--- a/go.mod
+++ b/go.mod
@@ -15,6 +15,7 @@ require (
github.com/olekukonko/tablewriter v0.0.2
github.com/opentracing/opentracing-go v1.0.2
github.com/prometheus/client_golang v1.0.0
+ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/procfs v0.0.3 // indirect
github.com/rubenv/sql-migrate v0.0.0-20191213152630-06338513c237
github.com/sirupsen/logrus v1.6.0
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go
index 4b474c179..465275299 100644
--- a/internal/praefect/datastore/glsql/postgres.go
+++ b/internal/praefect/datastore/glsql/postgres.go
@@ -41,6 +41,20 @@ type Querier interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
+// ListenHandler contains a set of methods that would be called on corresponding notifications received.
+type ListenHandler interface {
+ // Notification would be triggered once a new notification received.
+ Notification(payload string)
+ // Disconnect would be triggered once a connection to remote service is lost.
+ Disconnect()
+}
+
+// Listener listens for events that occur in the system.
+type Listener interface {
+ // Listen is a blocking call that triggers a passed in handler for the events that appear in the system.
+ Listen(ctx context.Context, handler ListenHandler) error
+}
+
// TxQuery runs operations inside transaction and commits|rollbacks on Done.
type TxQuery interface {
// Exec calls op function with provided ctx.
diff --git a/internal/praefect/datastore/init_test.go b/internal/praefect/datastore/init_test.go
index a9601791f..b793b35af 100644
--- a/internal/praefect/datastore/init_test.go
+++ b/internal/praefect/datastore/init_test.go
@@ -20,5 +20,8 @@ func TestMain(m *testing.M) {
os.Exit(code)
}
-func getDB(t testing.TB) glsql.DB { return glsql.GetDB(t, "datastore") }
-func getDBConfig(t testing.TB) config.DB { return glsql.GetDBConfig(t, "datastore") }
+const databaseName = "datastore"
+
+func getDB(t testing.TB) glsql.DB { return glsql.GetDB(t, databaseName) }
+
+func getDBConfig(t testing.TB) config.DB { return glsql.GetDBConfig(t, databaseName) }
diff --git a/internal/praefect/datastore/listener_postgres.go b/internal/praefect/datastore/listener_postgres.go
new file mode 100644
index 000000000..49cb8c13a
--- /dev/null
+++ b/internal/praefect/datastore/listener_postgres.go
@@ -0,0 +1,248 @@
+package datastore
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/lib/pq"
+ promclient "github.com/prometheus/client_golang/prometheus"
+ "gitlab.com/gitlab-org/gitaly/internal/dontpanic"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+)
+
+// PostgresListenerOpts is a set of configuration options for the PostgreSQL listener.
+type PostgresListenerOpts struct {
+ Addr string
+ Channel string
+ PingPeriod time.Duration
+ MinReconnectInterval time.Duration
+ MaxReconnectInterval time.Duration
+ DisconnectThreshold int
+ DisconnectTimeWindow time.Duration
+ ConnectAttemptThreshold int
+ ConnectAttemptTimeWindow time.Duration
+}
+
+// DefaultPostgresListenerOpts pre-defined options for PostgreSQL listener.
+var DefaultPostgresListenerOpts = PostgresListenerOpts{
+ PingPeriod: 10 * time.Second,
+ MinReconnectInterval: 5 * time.Second,
+ MaxReconnectInterval: 40 * time.Second,
+ DisconnectThreshold: 3,
+ DisconnectTimeWindow: time.Minute,
+ ConnectAttemptThreshold: 3,
+ ConnectAttemptTimeWindow: time.Minute,
+}
+
+// PostgresListener is an implementation based on the PostgreSQL LISTEN/NOTIFY functions.
+type PostgresListener struct {
+ mtx sync.Mutex
+ listener *pq.Listener
+ opts PostgresListenerOpts
+ closed chan struct{}
+ err error
+ reconnectTotal *promclient.CounterVec
+}
+
+// NewPostgresListener returns a new instance of the listener.
+func NewPostgresListener(opts PostgresListenerOpts) (*PostgresListener, error) {
+ switch {
+ case strings.TrimSpace(opts.Addr) == "":
+ return nil, fmt.Errorf("address is invalid: %q", opts.Addr)
+ case strings.TrimSpace(opts.Channel) == "":
+ return nil, fmt.Errorf("channel is invalid: %q", opts.Channel)
+ case opts.PingPeriod < 0:
+ return nil, fmt.Errorf("invalid ping period: %s", opts.PingPeriod)
+ case opts.MinReconnectInterval <= 0:
+ return nil, fmt.Errorf("invalid min reconnect period: %s", opts.MinReconnectInterval)
+ case opts.MaxReconnectInterval <= 0 || opts.MaxReconnectInterval < opts.MinReconnectInterval:
+ return nil, fmt.Errorf("invalid max reconnect period: %s", opts.MaxReconnectInterval)
+ }
+
+ return &PostgresListener{
+ opts: opts,
+ closed: make(chan struct{}),
+ reconnectTotal: promclient.NewCounterVec(
+ promclient.CounterOpts{
+ Name: "gitaly_praefect_notifications_reconnects_total",
+ Help: "Counts amount of reconnects to listen for notification from PostgreSQL",
+ },
+ []string{"state"},
+ ),
+ }, nil
+}
+
+func (pgl *PostgresListener) Listen(ctx context.Context, handler glsql.ListenHandler) error {
+ if err := pgl.initListener(handler); err != nil {
+ return err
+ }
+
+ pgl.handleNotifications(ctx, handler)
+
+ pgl.mtx.Lock()
+ defer pgl.mtx.Unlock()
+
+ return pgl.err
+}
+
+func listenerEventTypeToString(et pq.ListenerEventType) string {
+ switch et {
+ case pq.ListenerEventConnected:
+ return "connected"
+ case pq.ListenerEventDisconnected:
+ return "disconnected"
+ case pq.ListenerEventReconnected:
+ return "reconnected"
+ case pq.ListenerEventConnectionAttemptFailed:
+ return "connection_attempt_failed"
+ }
+ return fmt.Sprintf("unknown: %d", et)
+}
+
+func (pgl *PostgresListener) initListener(handler glsql.ListenHandler) error {
+ pgl.mtx.Lock()
+ defer pgl.mtx.Unlock()
+
+ if pgl.listener != nil {
+ return fmt.Errorf("already listening channel %q of %q", pgl.opts.Channel, pgl.opts.Addr)
+ }
+ pgl.err = nil
+
+ initialization := int32(1)
+ defer atomic.StoreInt32(&initialization, 0)
+
+ disconnectThreshold := threshold(pgl.opts.DisconnectThreshold, pgl.opts.DisconnectTimeWindow)
+ connectionAttemptFailedThreshold := threshold(pgl.opts.ConnectAttemptThreshold, pgl.opts.ConnectAttemptTimeWindow)
+
+ connectionLifecycle := func(eventType pq.ListenerEventType, err error) {
+ pgl.reconnectTotal.WithLabelValues(listenerEventTypeToString(eventType)).Inc()
+
+ switch eventType {
+ case pq.ListenerEventDisconnected:
+ dontpanic.Try(handler.Disconnect)
+ if disconnectThreshold() {
+ pgl.close(atomic.LoadInt32(&initialization) == 0, err)
+ }
+ case pq.ListenerEventConnectionAttemptFailed:
+ if connectionAttemptFailedThreshold() {
+ pgl.close(atomic.LoadInt32(&initialization) == 0, err)
+ }
+ }
+ }
+
+ pgl.listener = pq.NewListener(pgl.opts.Addr, pgl.opts.MinReconnectInterval, pgl.opts.MaxReconnectInterval, connectionLifecycle)
+
+ if err := pgl.listener.Listen(pgl.opts.Channel); err != nil {
+ return fmt.Errorf("listening for %q: %w", pgl.opts.Channel, err)
+ }
+
+ return pgl.err
+}
+
+func (pgl *PostgresListener) handleNotifications(ctx context.Context, handler glsql.ListenHandler) {
+ closed := pgl.closed
+ notify := pgl.listener.Notify
+
+ for {
+ select {
+ case <-closed:
+ return
+ case <-ctx.Done():
+ pgl.close(true, nil)
+ return
+ case <-time.After(pgl.opts.PingPeriod):
+ if err := pgl.listener.Ping(); err != nil {
+ pgl.close(true, err)
+ return
+ }
+ case notification := <-notify:
+ if notification == nil {
+ select {
+ case <-closed:
+ return
+ default:
+ continue
+ }
+ }
+
+ dontpanic.Try(func() { handler.Notification(notification.Extra) })
+ }
+ }
+}
+
+func (pgl *PostgresListener) close(lock bool, err error) {
+ if lock {
+ // we should not lock if close was called during initialisation step as lock is acquired by initListener already
+ pgl.mtx.Lock()
+ defer pgl.mtx.Unlock()
+ }
+
+ if pgl.listener == nil {
+ return
+ }
+
+ pgl.err = err
+
+ uerr := pgl.listener.UnlistenAll()
+ cerr := pgl.listener.Close()
+
+ if pgl.err == nil {
+ if uerr != nil {
+ pgl.err = uerr
+ } else {
+ pgl.err = cerr
+ }
+ }
+
+ close(pgl.closed)
+ pgl.closed = make(chan struct{})
+ pgl.listener = nil
+}
+
+func (pgl *PostgresListener) Describe(descs chan<- *promclient.Desc) {
+ promclient.DescribeByCollect(pgl, descs)
+}
+
+func (pgl *PostgresListener) Collect(metrics chan<- promclient.Metric) {
+ pgl.reconnectTotal.Collect(metrics)
+}
+
+// threshold returns a function each call of which returns a flag if
+// passed in threshold is reached in window time duration.
+// If threshold=3 and window=1s and function called 3 times within 1s
+// time interval then the last call will return true. This is a signal
+// that the threshold of invocation was reached in a configured time window.
+func threshold(threshold int, window time.Duration) func() bool {
+ // contains timestamps of the function invocation in [oldest->newest] order
+ var triggeredAt []time.Time
+ var mtx sync.Mutex
+
+ return func() bool {
+ if threshold == 0 {
+ return true
+ }
+
+ mtx.Lock()
+ defer mtx.Unlock()
+
+ now := time.Now()
+ triggeredAt = append(triggeredAt, now)
+
+ if len(triggeredAt) < threshold {
+ return false
+ }
+
+ triggeredAt = triggeredAt[len(triggeredAt)-threshold:] // this might be suboptimal when len(triggeredAt) = threshold
+
+ for _, t := range triggeredAt {
+ if now.Sub(t) > window {
+ return false
+ }
+ }
+ return true
+ }
+}
diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go
new file mode 100644
index 000000000..5a740b562
--- /dev/null
+++ b/internal/praefect/datastore/listener_postgres_test.go
@@ -0,0 +1,402 @@
+// +build postgres
+
+package datastore
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ promclient "github.com/prometheus/client_golang/prometheus"
+ promclientgo "github.com/prometheus/client_model/go"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestNewPostgresListener(t *testing.T) {
+ for title, tc := range map[string]struct {
+ opts PostgresListenerOpts
+ expErrMsg string
+ }{
+ "all set": {
+ opts: PostgresListenerOpts{
+ Addr: "stub",
+ Channel: "sting",
+ MinReconnectInterval: time.Second,
+ MaxReconnectInterval: time.Minute,
+ },
+ },
+ "invalid option: address": {
+ opts: PostgresListenerOpts{Addr: ""},
+ expErrMsg: "address is invalid",
+ },
+ "invalid option: channel": {
+ opts: PostgresListenerOpts{Addr: "stub", Channel: " "},
+ expErrMsg: "channel is invalid",
+ },
+ "invalid option: ping period": {
+ opts: PostgresListenerOpts{Addr: "stub", Channel: "stub", PingPeriod: -1},
+ expErrMsg: "invalid ping period",
+ },
+ "invalid option: min reconnect period": {
+ opts: PostgresListenerOpts{Addr: "stub", Channel: "stub", MinReconnectInterval: 0},
+ expErrMsg: "invalid min reconnect period",
+ },
+ "invalid option: max reconnect period": {
+ opts: PostgresListenerOpts{Addr: "stub", Channel: "stub", MinReconnectInterval: time.Second, MaxReconnectInterval: time.Millisecond},
+ expErrMsg: "invalid max reconnect period",
+ },
+ } {
+ t.Run(title, func(t *testing.T) {
+ pgl, err := NewPostgresListener(tc.opts)
+ if tc.expErrMsg != "" {
+ require.Error(t, err)
+ require.Contains(t, err.Error(), tc.expErrMsg)
+ return
+ }
+ require.NoError(t, err)
+ require.NotNil(t, pgl)
+ })
+ }
+}
+
+type mockListenHandler struct {
+ OnNotification func(string)
+ OnDisconnect func()
+}
+
+func (mlh mockListenHandler) Notification(v string) {
+ if mlh.OnNotification != nil {
+ mlh.OnNotification(v)
+ }
+}
+
+func (mlh mockListenHandler) Disconnect() {
+ if mlh.OnDisconnect != nil {
+ mlh.OnDisconnect()
+ }
+}
+
+func TestPostgresListener_Listen(t *testing.T) {
+ db := getDB(t)
+
+ newOpts := func() PostgresListenerOpts {
+ opts := DefaultPostgresListenerOpts
+ opts.Addr = getDBConfig(t).ToPQString(true)
+ opts.Channel = fmt.Sprintf("channel_%d", time.Now().UnixNano())
+ opts.MinReconnectInterval = time.Nanosecond
+ opts.MaxReconnectInterval = time.Second
+ return opts
+ }
+
+ notifyListener := func(t *testing.T, channelName, payload string) {
+ t.Helper()
+
+ _, err := db.Exec(fmt.Sprintf(`NOTIFY %s, '%s'`, channelName, payload))
+ assert.NoError(t, err)
+ }
+
+ listenNotify := func(t *testing.T, opts PostgresListenerOpts, numNotifiers int, payloads []string) (*PostgresListener, []string) {
+ t.Helper()
+
+ pgl, err := NewPostgresListener(opts)
+
+ require.NoError(t, err)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ numResults := len(payloads) * numNotifiers
+ allReceivedChan := make(chan struct{})
+
+ go func() {
+ defer cancel()
+
+ time.Sleep(100 * time.Millisecond)
+
+ var wg sync.WaitGroup
+ wg.Add(numNotifiers)
+ for i := 0; i < numNotifiers; i++ {
+ go func() {
+ defer wg.Done()
+
+ for _, payload := range payloads {
+ notifyListener(t, opts.Channel, payload)
+ }
+ }()
+ }
+ wg.Wait()
+
+ select {
+ case <-time.After(time.Second):
+ assert.FailNow(t, "notification propagation takes too long")
+ case <-allReceivedChan:
+ }
+ }()
+
+ result := make([]string, numResults)
+ idx := int32(-1)
+ callback := func(payload string) {
+ i := int(atomic.AddInt32(&idx, 1))
+ result[i] = payload
+ if i+1 == numResults {
+ close(allReceivedChan)
+ }
+ }
+
+ require.NoError(t, pgl.Listen(ctx, mockListenHandler{OnNotification: callback}))
+
+ return pgl, result
+ }
+
+ disconnectListener := func(t *testing.T, channelName string) {
+ t.Helper()
+
+ q := `SELECT PG_TERMINATE_BACKEND(pid) FROM PG_STAT_ACTIVITY WHERE datname = $1 AND query = $2`
+ res, err := db.Exec(q, databaseName, fmt.Sprintf("LISTEN %q", channelName))
+ if assert.NoError(t, err) {
+ affected, err := res.RowsAffected()
+ assert.NoError(t, err)
+ assert.EqualValues(t, 1, affected)
+ }
+ }
+
+ readMetrics := func(t *testing.T, col promclient.Collector) []promclientgo.Metric {
+ t.Helper()
+
+ metricsChan := make(chan promclient.Metric, 16)
+ col.Collect(metricsChan)
+ close(metricsChan)
+ var metric []promclientgo.Metric
+ for m := range metricsChan {
+ var mtc promclientgo.Metric
+ assert.NoError(t, m.Write(&mtc))
+ metric = append(metric, mtc)
+ }
+ return metric
+ }
+
+ t.Run("single processor and single notifier", func(t *testing.T) {
+ opts := newOpts()
+
+ payloads := []string{"this", "is", "a", "payload"}
+
+ listener, result := listenNotify(t, opts, 1, payloads)
+ require.Equal(t, payloads, result)
+
+ metrics := readMetrics(t, listener.reconnectTotal)
+ require.Len(t, metrics, 1)
+ require.Len(t, metrics[0].Label, 1)
+ require.Equal(t, "state", *metrics[0].Label[0].Name)
+ require.Equal(t, "connected", *metrics[0].Label[0].Value)
+ require.GreaterOrEqual(t, *metrics[0].Counter.Value, 1.0)
+ })
+
+ t.Run("single processor and multiple notifiers", func(t *testing.T) {
+ opts := newOpts()
+
+ numNotifiers := 10
+
+ payloads := []string{"this", "is", "a", "payload"}
+ var expResult []string
+ for i := 0; i < numNotifiers; i++ {
+ expResult = append(expResult, payloads...)
+ }
+
+ _, result := listenNotify(t, opts, numNotifiers, payloads)
+ assert.ElementsMatch(t, expResult, result, "there must be no additional data, only expected")
+ })
+
+ t.Run("re-listen", func(t *testing.T) {
+ opts := newOpts()
+ listener, result := listenNotify(t, opts, 1, []string{"1"})
+ require.Equal(t, []string{"1"}, result)
+
+ ctx, cancel := testhelper.Context()
+
+ var connected int32
+
+ errCh := make(chan error, 1)
+ go func() {
+ errCh <- listener.Listen(ctx, mockListenHandler{OnNotification: func(payload string) {
+ atomic.StoreInt32(&connected, 1)
+ assert.Equal(t, "2", payload)
+ }})
+ }()
+
+ for atomic.LoadInt32(&connected) == 0 {
+ notifyListener(t, opts.Channel, "2")
+ }
+
+ cancel()
+ err := <-errCh
+ require.NoError(t, err)
+ })
+
+ t.Run("already listening", func(t *testing.T) {
+ opts := newOpts()
+
+ listener, err := NewPostgresListener(opts)
+ require.NoError(t, err)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ var connected int32
+
+ errCh := make(chan error, 1)
+ go func() {
+ errCh <- listener.Listen(ctx, mockListenHandler{OnNotification: func(payload string) {
+ atomic.StoreInt32(&connected, 1)
+ assert.Equal(t, "2", payload)
+ }})
+ }()
+
+ for atomic.LoadInt32(&connected) == 0 {
+ notifyListener(t, opts.Channel, "2")
+ }
+
+ err = listener.Listen(ctx, mockListenHandler{})
+ require.Error(t, err)
+ require.Equal(t, fmt.Sprintf(`already listening channel %q of %q`, opts.Channel, opts.Addr), err.Error())
+ })
+
+ t.Run("invalid connection", func(t *testing.T) {
+ opts := newOpts()
+ opts.Addr = "invalid-address"
+
+ listener, err := NewPostgresListener(opts)
+ require.NoError(t, err)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ err = listener.Listen(ctx, mockListenHandler{OnNotification: func(string) {
+ assert.FailNow(t, "no notifications expected to be received")
+ }})
+ require.Error(t, err, "it should not be possible to start listening on invalid connection")
+ })
+
+ t.Run("connection interruption", func(t *testing.T) {
+ opts := newOpts()
+ listener, err := NewPostgresListener(opts)
+ require.NoError(t, err)
+
+ ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(time.Second))
+ defer cancel()
+
+ var connected int32
+
+ errChan := make(chan error, 1)
+ go func() {
+ errChan <- listener.Listen(ctx, mockListenHandler{OnNotification: func(string) {
+ atomic.StoreInt32(&connected, 1)
+ }})
+ }()
+
+ for atomic.LoadInt32(&connected) == 0 {
+ notifyListener(t, opts.Channel, "")
+ }
+
+ disconnectListener(t, opts.Channel)
+ atomic.StoreInt32(&connected, 0)
+
+ for atomic.LoadInt32(&connected) == 0 {
+ notifyListener(t, opts.Channel, "")
+ }
+
+ cancel()
+ require.NoError(t, <-errChan)
+ })
+
+ t.Run("persisted connection interruption", func(t *testing.T) {
+ opts := newOpts()
+ listener, err := NewPostgresListener(opts)
+ require.NoError(t, err)
+
+ ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(time.Second))
+ defer cancel()
+
+ var connected int32
+ var disconnected int32
+
+ errChan := make(chan error, 1)
+ go func() {
+ errChan <- listener.Listen(
+ ctx,
+ mockListenHandler{
+ OnNotification: func(string) { atomic.StoreInt32(&connected, 1) },
+ OnDisconnect: func() { atomic.AddInt32(&disconnected, 1) },
+ })
+ }()
+
+ for i := 0; i < opts.DisconnectThreshold; i++ {
+ for atomic.LoadInt32(&connected) == 0 {
+ notifyListener(t, opts.Channel, "")
+ }
+
+ disconnectListener(t, opts.Channel)
+ atomic.StoreInt32(&connected, 0)
+ }
+
+ err = <-errChan
+ require.Error(t, err)
+ require.False(t, errors.Is(err, context.DeadlineExceeded), "listener was blocked for too long")
+
+ metrics := readMetrics(t, listener.reconnectTotal)
+ for _, metric := range metrics {
+ switch *metric.Label[0].Value {
+ case "connected":
+ require.GreaterOrEqual(t, *metric.Counter.Value, 1.0)
+ case "disconnected":
+ require.EqualValues(t, disconnected, *metric.Counter.Value)
+ case "reconnected":
+ require.GreaterOrEqual(t, *metric.Counter.Value, 2.0)
+ }
+ }
+ })
+}
+
+func TestThreshold(t *testing.T) {
+ t.Run("reaches as there are no pauses between the calls", func(t *testing.T) {
+ thresholdReached := threshold(100, time.Hour)
+
+ for i := 0; i < 99; i++ {
+ require.False(t, thresholdReached())
+ }
+ require.True(t, thresholdReached())
+ })
+
+ t.Run("doesn't reach because of pauses between the calls", func(t *testing.T) {
+ thresholdReached := threshold(2, time.Microsecond)
+
+ require.False(t, thresholdReached())
+ time.Sleep(time.Millisecond)
+ require.False(t, thresholdReached())
+ })
+
+ t.Run("reaches only on 6-th call because of the pause after first check", func(t *testing.T) {
+ thresholdReached := threshold(5, time.Millisecond)
+
+ require.False(t, thresholdReached())
+ time.Sleep(time.Millisecond)
+ require.False(t, thresholdReached())
+ require.False(t, thresholdReached())
+ require.False(t, thresholdReached())
+ require.False(t, thresholdReached())
+ require.True(t, thresholdReached())
+ })
+
+ t.Run("always reached for zero values", func(t *testing.T) {
+ thresholdReached := threshold(0, 0)
+
+ require.True(t, thresholdReached())
+ time.Sleep(time.Millisecond)
+ require.True(t, thresholdReached())
+ })
+}