Welcome to mirror list, hosted at ThFree Co, Russian Federation.

listener.go « datastore « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 34a145b3cdf3a7dddf50fed36ccdec45322e6121 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package datastore

import (
	"context"
	"errors"
	"fmt"
	"strings"
	"time"

	"github.com/jackc/pgx/v4"
	promclient "github.com/prometheus/client_golang/prometheus"
	"github.com/sirupsen/logrus"
	"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
)

const (
	// StorageRepositoriesUpdatesChannel is a name of the database event channel
	// used to send events with changes done to 'storage_repositories' table.
	StorageRepositoriesUpdatesChannel = "storage_repositories_updates"
	// RepositoriesUpdatesChannel is a name of the database event channel
	// used to send events with changes done to 'repositories' table.
	RepositoriesUpdatesChannel = "repositories_updates"
)

// Listener is designed to listen for PostgreSQL database NOTIFY events.
// It connects to the database with Listen method and starts to listen for
// events.
type Listener struct {
	pgxCfg *pgx.ConnConfig
}

// NewListener returns a listener that is ready to listen for PostgreSQL notifications.
func NewListener(conf config.DB) (*Listener, error) {
	pgxCfg, err := pgx.ParseConfig(glsql.DSN(conf, true))
	if err != nil {
		return nil, fmt.Errorf("connection config preparation: %w", err)
	}

	return &Listener{pgxCfg: pgxCfg}, nil
}

// Listen starts listening for the events. Each event is passed to the handler for processing.
// Listen is a blocking call, it returns in case context is cancelled or an error occurs while
// receiving notifications from the database.
func (l *Listener) Listen(ctx context.Context, handler glsql.ListenHandler, channels ...string) error {
	conn, err := pgx.ConnectConfig(ctx, l.pgxCfg)
	if err != nil {
		return fmt.Errorf("connect: %w", err)
	}

	defer func() {
		// To exclude hang of the service on termination we wait for reasonable timeout.
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		_ = conn.Close(ctx) // we don't care much because we can't do anything
	}()

	query := compileListenQuery(channels)
	if _, err := conn.Exec(ctx, query); err != nil {
		return fmt.Errorf("listen on channel(s): %w", err)
	}

	handler.Connected()

	for {
		nf, err := conn.WaitForNotification(ctx)
		if err != nil {
			handler.Disconnect(err)
			return fmt.Errorf("wait for notification: %w", err)
		}

		handler.Notification(glsql.Notification{
			Channel: nf.Channel,
			Payload: nf.Payload,
		})
	}
}

func compileListenQuery(channels []string) string {
	channelNames := make([]interface{}, len(channels))
	for i, channel := range channels {
		channelNames[i] = channel
	}
	statement := fmt.Sprintf(strings.Repeat("listen %s;", len(channels)), channelNames...)
	return statement
}

type metricsHandlerMiddleware struct {
	glsql.ListenHandler
	counter *promclient.CounterVec
}

func (mh metricsHandlerMiddleware) Connected() {
	mh.counter.WithLabelValues("connected").Inc()
	mh.ListenHandler.Connected()
}

func (mh metricsHandlerMiddleware) Disconnect(err error) {
	mh.counter.WithLabelValues("disconnected").Inc()
	mh.ListenHandler.Disconnect(err)
}

// ResilientListener allows listen for notifications resiliently.
type ResilientListener struct {
	conf           config.DB
	ticker         helper.Ticker
	logger         logrus.FieldLogger
	reconnectTotal *promclient.CounterVec
}

// NewResilientListener returns instance of the *ResilientListener.
func NewResilientListener(conf config.DB, ticker helper.Ticker, logger logrus.FieldLogger) *ResilientListener {
	return &ResilientListener{
		conf:   conf,
		ticker: ticker,
		logger: logger.WithField("component", "resilient_listener"),
		reconnectTotal: promclient.NewCounterVec(
			promclient.CounterOpts{
				Name: "gitaly_praefect_notifications_reconnects_total",
				Help: "Counts amount of reconnects to listen for notification from PostgreSQL",
			},
			[]string{"state"},
		),
	}
}

// Listen starts a new Listener and listens for the notifications on the channels.
// If error occurs and connection is closed/terminated another Listener is created
// after some await period. The method returns only when provided context is cancelled
// or invalid configuration is used.
func (rl *ResilientListener) Listen(ctx context.Context, handler glsql.ListenHandler, channels ...string) error {
	defer rl.ticker.Stop()
	for {
		lis, err := NewListener(rl.conf)
		if err != nil {
			return err
		}

		handler := metricsHandlerMiddleware{ListenHandler: handler, counter: rl.reconnectTotal}
		if err := lis.Listen(ctx, handler, channels...); err != nil {
			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
				return err
			}

			rl.logger.WithError(err).
				WithField("channels", channels).
				Error("listening was interrupted")
		}

		rl.ticker.Reset()
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-rl.ticker.C():
		}
	}
}

// Describe return description of the metric.
func (rl *ResilientListener) Describe(descs chan<- *promclient.Desc) {
	promclient.DescribeByCollect(rl, descs)
}

// Collect returns set of metrics collected during execution.
func (rl *ResilientListener) Collect(metrics chan<- promclient.Metric) {
	rl.reconnectTotal.Collect(metrics)
}