Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--NOTICE11
-rw-r--r--go.mod1
-rw-r--r--internal/praefect/datastore/listener_postgres.go237
-rw-r--r--internal/praefect/datastore/listener_postgres_test.go558
-rw-r--r--internal/praefect/datastore/listener_test.go42
5 files changed, 42 insertions, 807 deletions
diff --git a/NOTICE b/NOTICE
index 27a193694..f594d134f 100644
--- a/NOTICE
+++ b/NOTICE
@@ -12447,17 +12447,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-LICENSE.md - github.com/lib/pq
-Copyright (c) 2011-2013, 'pq' Contributors
-Portions Copyright (C) 2011 Blake Mizerany
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LICENSE - github.com/libgit2/git2go/v32
The MIT License
diff --git a/go.mod b/go.mod
index e9a4dcd9f..09843055a 100644
--- a/go.mod
+++ b/go.mod
@@ -26,7 +26,6 @@ require (
github.com/jackc/pgtype v1.9.1
github.com/jackc/pgx/v4 v4.14.1
github.com/kelseyhightower/envconfig v1.3.0
- github.com/lib/pq v1.10.2
github.com/libgit2/git2go/v32 v32.0.5
github.com/olekukonko/tablewriter v0.0.2
github.com/opencontainers/runtime-spec v1.0.2
diff --git a/internal/praefect/datastore/listener_postgres.go b/internal/praefect/datastore/listener_postgres.go
deleted file mode 100644
index 53d1f1f75..000000000
--- a/internal/praefect/datastore/listener_postgres.go
+++ /dev/null
@@ -1,237 +0,0 @@
-package datastore
-
-import (
- "errors"
- "fmt"
- "strings"
- "sync"
- "time"
-
- "github.com/lib/pq"
- promclient "github.com/prometheus/client_golang/prometheus"
- "github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
-)
-
-// PostgresListenerOpts is a set of configuration options for the PostgreSQL listener.
-type PostgresListenerOpts struct {
- // Addr is an address to database instance.
- Addr string
- // Channels is a list of channel to listen for notifications.
- Channels []string
- // PingPeriod is a period to wait before executing a pin call on the connection to verify if it is still healthy.
- PingPeriod time.Duration
- // MinReconnectInterval controls the duration to wait before trying to
- // re-establish the database connection after connection loss.
- MinReconnectInterval time.Duration
- // MaxReconnectInterval is a max interval to wait until successful connection establishment.
- MaxReconnectInterval time.Duration
-}
-
-// DefaultPostgresListenerOpts pre-defined options for PostgreSQL listener.
-var DefaultPostgresListenerOpts = PostgresListenerOpts{
- PingPeriod: 10 * time.Second,
- MinReconnectInterval: 5 * time.Second,
- MaxReconnectInterval: 40 * time.Second,
-}
-
-// PostgresListener is an implementation based on the PostgreSQL LISTEN/NOTIFY functions.
-type PostgresListener struct {
- logger logrus.FieldLogger
- listener *pq.Listener
- handler glsql.ListenHandler
- opts PostgresListenerOpts
- closed chan struct{}
- reconnectTotal *promclient.CounterVec
- wg sync.WaitGroup
-}
-
-// NewPostgresListener returns a new instance of the listener.
-func NewPostgresListener(logger logrus.FieldLogger, opts PostgresListenerOpts, handler glsql.ListenHandler) (*PostgresListener, error) {
- switch {
- case strings.TrimSpace(opts.Addr) == "":
- return nil, fmt.Errorf("address is invalid: %q", opts.Addr)
- case len(opts.Channels) == 0:
- return nil, errors.New("no channels to listen")
- case opts.PingPeriod < 0:
- return nil, fmt.Errorf("invalid ping period: %s", opts.PingPeriod)
- case opts.MinReconnectInterval <= 0:
- return nil, fmt.Errorf("invalid min reconnect period: %s", opts.MinReconnectInterval)
- case opts.MaxReconnectInterval <= 0 || opts.MaxReconnectInterval < opts.MinReconnectInterval:
- return nil, fmt.Errorf("invalid max reconnect period: %s", opts.MaxReconnectInterval)
- }
-
- pgl := &PostgresListener{
- logger: logger.WithField("component", "postgres_listener"),
- opts: opts,
- handler: handler,
- closed: make(chan struct{}),
- reconnectTotal: promclient.NewCounterVec(
- promclient.CounterOpts{
- Name: "gitaly_praefect_notifications_reconnects_total",
- Help: "Counts amount of reconnects to listen for notification from PostgreSQL",
- },
- []string{"state"},
- ),
- }
-
- if err := pgl.connect(); err != nil {
- if err := pgl.Close(); err != nil {
- pgl.logger.WithError(err).Error("releasing listener resources after failed to listen on it")
- }
- return nil, fmt.Errorf("connect: %w", err)
- }
-
- return pgl, nil
-}
-
-func (pgl *PostgresListener) connect() error {
- firstConnectionAttempt := true
- connectErrChan := make(chan error, 1)
- listenerAssignedChan := make(chan struct{})
-
- connectionLifecycle := func(eventType pq.ListenerEventType, err error) {
- pgl.reconnectTotal.WithLabelValues(listenerEventTypeToString(eventType)).Inc()
-
- switch eventType {
- case pq.ListenerEventConnectionAttemptFailed:
- pgl.logger.WithError(err).Error(listenerEventTypeToString(eventType))
- if firstConnectionAttempt {
- firstConnectionAttempt = false
- // if a first attempt to establish a connection to a remote is failed
- // we should not proceed as it won't be possible to distinguish between
- // temporary errors and initialization errors like invalid
- // connection address.
- connectErrChan <- err
- }
- case pq.ListenerEventConnected:
- // once the connection is established we can be sure that the connection
- // address is correct and all other errors could be considered as
- // a temporary, so listener will try to re-connect and proceed.
- pgl.async(func() {
- <-listenerAssignedChan
- pgl.ping()
- })
- pgl.async(func() {
- <-listenerAssignedChan
- pgl.handleNotifications()
- })
-
- close(connectErrChan) // to signal the connection was established without troubles
- firstConnectionAttempt = false
-
- pgl.handler.Connected()
- case pq.ListenerEventReconnected:
- pgl.handler.Connected()
- case pq.ListenerEventDisconnected:
- pgl.logger.WithError(err).Error(listenerEventTypeToString(eventType))
- pgl.handler.Disconnect(err)
- }
- }
-
- pgl.listener = pq.NewListener(pgl.opts.Addr, pgl.opts.MinReconnectInterval, pgl.opts.MaxReconnectInterval, connectionLifecycle)
- close(listenerAssignedChan)
-
- listenErrChan := make(chan error, 1)
- pgl.async(func() {
- // we need to start channel listeners in a parallel, otherwise if a bad connection string provided
- // the connectionLifecycle callback will always receive pq.ListenerEventConnectionAttemptFailed event.
- // When a listener is added it will produce an error on attempt to use a connection and re-connection
- // loop will be interrupted.
- listenErrChan <- pgl.listen()
- })
-
- if err := <-connectErrChan; err != nil {
- return err
- }
-
- return <-listenErrChan
-}
-
-//nolint: revive,stylecheck // This is unintentionally missing documentation.
-func (pgl *PostgresListener) Close() error {
- defer func() {
- close(pgl.closed)
- pgl.wg.Wait()
- }()
- return pgl.listener.Close()
-}
-
-func listenerEventTypeToString(et pq.ListenerEventType) string {
- switch et {
- case pq.ListenerEventConnected:
- return "connected"
- case pq.ListenerEventDisconnected:
- return "disconnected"
- case pq.ListenerEventReconnected:
- return "reconnected"
- case pq.ListenerEventConnectionAttemptFailed:
- return "connection_attempt_failed"
- }
- return fmt.Sprintf("unknown: %d", et)
-}
-
-func (pgl *PostgresListener) listen() error {
- for _, channel := range pgl.opts.Channels {
- if err := pgl.listener.Listen(channel); err != nil {
- return err
- }
- }
- return nil
-}
-
-func (pgl *PostgresListener) handleNotifications() {
- for {
- select {
- case <-pgl.closed:
- return
- case notification, ok := <-pgl.listener.Notify:
- if !ok {
- // this happens when the Close is called on the listener
- return
- }
-
- if notification == nil {
- // this happens when pq.ListenerEventReconnected is emitted after a database
- // connection has been re-established after connection loss
- continue
- }
-
- pgl.handler.Notification(glsql.Notification{
- Channel: notification.Channel,
- Payload: notification.Extra,
- })
- }
- }
-}
-
-func (pgl *PostgresListener) ping() {
- for {
- select {
- case <-pgl.closed:
- return
- case <-time.After(pgl.opts.PingPeriod):
- if err := pgl.listener.Ping(); err != nil {
- pgl.logger.WithError(err).Error("health check ping failed")
- }
- }
- }
-}
-
-func (pgl *PostgresListener) async(f func()) {
- pgl.wg.Add(1)
- go func() {
- defer pgl.wg.Done()
- f()
- }()
-}
-
-//nolint: revive,stylecheck // This is unintentionally missing documentation.
-func (pgl *PostgresListener) Describe(descs chan<- *promclient.Desc) {
- promclient.DescribeByCollect(pgl, descs)
-}
-
-//nolint: revive,stylecheck // This is unintentionally missing documentation.
-func (pgl *PostgresListener) Collect(metrics chan<- promclient.Metric) {
- pgl.reconnectTotal.Collect(metrics)
-}
diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go
deleted file mode 100644
index 26013ebd6..000000000
--- a/internal/praefect/datastore/listener_postgres_test.go
+++ /dev/null
@@ -1,558 +0,0 @@
-package datastore
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "sort"
- "strings"
- "sync"
- "testing"
- "time"
-
- "github.com/lib/pq"
- "github.com/prometheus/client_golang/prometheus/testutil"
- "github.com/sirupsen/logrus/hooks/test"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
-)
-
-func TestNewPostgresListener(t *testing.T) {
- for title, tc := range map[string]struct {
- opts PostgresListenerOpts
- handler glsql.ListenHandler
- expErrMsg string
- }{
- "invalid option: address": {
- opts: PostgresListenerOpts{Addr: ""},
- expErrMsg: "address is invalid",
- },
- "invalid option: channels": {
- opts: PostgresListenerOpts{Addr: "stub", Channels: nil},
- expErrMsg: "no channels to listen",
- },
- "invalid option: ping period": {
- opts: PostgresListenerOpts{Addr: "stub", Channels: []string{""}, PingPeriod: -1},
- expErrMsg: "invalid ping period",
- },
- "invalid option: min reconnect period": {
- opts: PostgresListenerOpts{Addr: "stub", Channels: []string{""}, MinReconnectInterval: 0},
- expErrMsg: "invalid min reconnect period",
- },
- "invalid option: max reconnect period": {
- opts: PostgresListenerOpts{Addr: "stub", Channels: []string{""}, MinReconnectInterval: time.Second, MaxReconnectInterval: time.Millisecond},
- expErrMsg: "invalid max reconnect period",
- },
- } {
- t.Run(title, func(t *testing.T) {
- pgl, err := NewPostgresListener(testhelper.NewDiscardingLogger(t), tc.opts, nil)
- if tc.expErrMsg != "" {
- require.Error(t, err)
- require.Contains(t, err.Error(), tc.expErrMsg)
- return
- }
- require.NoError(t, err)
- require.NotNil(t, pgl)
- })
- }
-}
-
-type mockListenHandler struct {
- OnNotification func(glsql.Notification)
- OnDisconnect func(error)
- OnConnected func()
-}
-
-func (mlh mockListenHandler) Notification(n glsql.Notification) {
- if mlh.OnNotification != nil {
- mlh.OnNotification(n)
- }
-}
-
-func (mlh mockListenHandler) Disconnect(err error) {
- if mlh.OnDisconnect != nil {
- mlh.OnDisconnect(err)
- }
-}
-
-func (mlh mockListenHandler) Connected() {
- if mlh.OnConnected != nil {
- mlh.OnConnected()
- }
-}
-
-func TestPostgresListener_Listen(t *testing.T) {
- t.Parallel()
- db := testdb.New(t)
-
- logger := testhelper.NewDiscardingLogger(t)
-
- newOpts := func() PostgresListenerOpts {
- opts := DefaultPostgresListenerOpts
- opts.Addr = glsql.DSN(testdb.GetConfig(t, db.Name), true)
- opts.MinReconnectInterval = time.Nanosecond
- opts.MaxReconnectInterval = time.Minute
- return opts
- }
-
- newChannel := func(i int) func() string {
- return func() string {
- i++
- return fmt.Sprintf("channel_%d", i)
- }
- }(0)
-
- notifyListener := func(t *testing.T, channelName, payload string) {
- t.Helper()
-
- _, err := db.Exec(fmt.Sprintf(`NOTIFY %s, '%s'`, channelName, payload))
- assert.NoError(t, err)
- }
-
- listenNotify := func(t *testing.T, opts PostgresListenerOpts, numNotifiers int, payloads []string) (*PostgresListener, []string) {
- t.Helper()
-
- start := make(chan struct{})
- done := make(chan struct{})
- defer func() { <-done }()
-
- numResults := len(payloads) * numNotifiers
- allReceivedChan := make(chan struct{})
-
- go func() {
- defer close(done)
-
- <-start
-
- var wg sync.WaitGroup
- for i := 0; i < numNotifiers; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for _, payload := range payloads {
- for _, channel := range opts.Channels {
- notifyListener(t, channel, payload)
- }
- }
- }()
- }
- wg.Wait()
-
- select {
- case <-time.After(time.Second):
- assert.FailNow(t, "notification propagation takes too long")
- case <-allReceivedChan:
- }
- }()
-
- result := make([]string, numResults)
- callback := func(idx int) func(n glsql.Notification) {
- return func(n glsql.Notification) {
- idx++
- result[idx] = n.Payload
- if idx+1 == numResults {
- close(allReceivedChan)
- }
- }
- }(-1)
-
- handler := mockListenHandler{OnNotification: callback, OnConnected: func() { close(start) }}
- pgl, err := NewPostgresListener(logger, opts, handler)
- require.NoError(t, err)
-
- return pgl, result
- }
-
- disconnectListener := func(t *testing.T, channelName string) {
- t.Helper()
-
- q := `SELECT PG_TERMINATE_BACKEND(pid) FROM PG_STAT_ACTIVITY WHERE datname = $1 AND query = $2`
- res, err := db.Exec(q, db.Name, fmt.Sprintf("LISTEN %q", channelName))
- if assert.NoError(t, err) {
- affected, err := res.RowsAffected()
- assert.NoError(t, err)
- assert.EqualValues(t, 1, affected)
- }
- }
-
- waitFor := func(t *testing.T, c <-chan struct{}, d time.Duration) {
- t.Helper()
-
- select {
- case <-time.After(d):
- require.FailNow(t, "it takes too long")
- case <-c:
- // proceed
- }
- }
-
- t.Run("single handler and single notifier", func(t *testing.T) {
- opts := newOpts()
- opts.Channels = []string{newChannel()}
-
- payloads := []string{"this", "is", "a", "payload"}
-
- listener, result := listenNotify(t, opts, 1, payloads)
- defer func() { require.NoError(t, listener.Close()) }()
- require.Equal(t, payloads, result)
- })
-
- t.Run("single handler and multiple notifiers", func(t *testing.T) {
- opts := newOpts()
- opts.Channels = []string{newChannel()}
-
- numNotifiers := 10
-
- payloads := []string{"this", "is", "a", "payload"}
- var expResult []string
- for i := 0; i < numNotifiers; i++ {
- expResult = append(expResult, payloads...)
- }
-
- listener, result := listenNotify(t, opts, numNotifiers, payloads)
- defer func() { require.NoError(t, listener.Close()) }()
- require.ElementsMatch(t, expResult, result, "there must be no additional data, only expected")
- })
-
- t.Run("multiple channels", func(t *testing.T) {
- opts := newOpts()
- opts.Channels = []string{"channel_1", "channel_2"}
-
- start := make(chan struct{})
- resultChan := make(chan glsql.Notification)
- handler := mockListenHandler{
- OnNotification: func(n glsql.Notification) { resultChan <- n },
- OnConnected: func() { close(start) },
- }
-
- listener, err := NewPostgresListener(logger, opts, handler)
- require.NoError(t, err)
- defer func() { require.NoError(t, listener.Close()) }()
-
- waitFor(t, start, time.Minute)
-
- var expectedNotifications []glsql.Notification
- for i := 0; i < 3; i++ {
- for _, channel := range opts.Channels {
- payload := fmt.Sprintf("%s:%d", channel, i)
- notifyListener(t, channel, payload)
-
- expectedNotifications = append(expectedNotifications, glsql.Notification{
- Channel: channel,
- Payload: payload,
- })
- }
- }
-
- tooLong := time.After(time.Minute)
- var actualNotifications []glsql.Notification
- for i := 0; i < len(expectedNotifications); i++ {
- select {
- case <-tooLong:
- require.FailNow(t, "no notifications for too long")
- case notification := <-resultChan:
- actualNotifications = append(actualNotifications, notification)
- }
- }
-
- require.Equal(t, expectedNotifications, actualNotifications)
- })
-
- t.Run("invalid connection", func(t *testing.T) {
- opts := newOpts()
- opts.Addr = "invalid-address"
- opts.Channels = []string{"stub"}
-
- logger, hook := test.NewNullLogger()
-
- _, err := NewPostgresListener(logger, opts, mockListenHandler{})
- require.Error(t, err)
- require.Regexp(t, "^connect: .*invalid-address.*", err.Error())
-
- entries := hook.AllEntries()
- require.GreaterOrEqualf(t, len(entries), 1, "it should log at least failed initial attempt to connect")
- require.Equal(t, "connection_attempt_failed", entries[0].Message)
- })
-
- t.Run("channel used more then once", func(t *testing.T) {
- opts := newOpts()
- opts.Channels = []string{"stub1", "stub2", "stub1"}
-
- _, err := NewPostgresListener(logger, opts, mockListenHandler{})
- require.True(t, errors.Is(err, pq.ErrChannelAlreadyOpen), err)
- })
-
- t.Run("connection interruption", func(t *testing.T) {
- opts := newOpts()
- opts.Channels = []string{newChannel()}
-
- connected := make(chan struct{}, 1)
- handler := mockListenHandler{OnConnected: func() { connected <- struct{}{} }}
-
- listener, err := NewPostgresListener(logger, opts, handler)
- require.NoError(t, err)
-
- waitFor(t, connected, time.Minute)
- disconnectListener(t, opts.Channels[0])
- waitFor(t, connected, time.Minute)
- require.NoError(t, listener.Close())
-
- err = testutil.CollectAndCompare(listener, strings.NewReader(`
- # HELP gitaly_praefect_notifications_reconnects_total Counts amount of reconnects to listen for notification from PostgreSQL
- # TYPE gitaly_praefect_notifications_reconnects_total counter
- gitaly_praefect_notifications_reconnects_total{state="connected"} 1
- gitaly_praefect_notifications_reconnects_total{state="disconnected"} 1
- gitaly_praefect_notifications_reconnects_total{state="reconnected"} 1
- `))
- require.NoError(t, err)
- })
-
- t.Run("persisted connection interruption", func(t *testing.T) {
- opts := newOpts()
- opts.Channels = []string{newChannel()}
-
- connected := make(chan struct{}, 1)
- disconnected := make(chan struct{}, 1)
- handler := mockListenHandler{
- OnConnected: func() { connected <- struct{}{} },
- OnDisconnect: func(err error) {
- assert.Error(t, err, "disconnect event should always receive non-nil error")
- disconnected <- struct{}{}
- },
- }
-
- listener, err := NewPostgresListener(logger, opts, handler)
- require.NoError(t, err)
-
- for i := 0; i < 3; i++ {
- waitFor(t, connected, time.Minute)
- disconnectListener(t, opts.Channels[0])
- waitFor(t, disconnected, time.Minute)
- }
-
- // this additional step is required to have exactly 3 "reconnected" metric value, otherwise it could
- // be 2 or 3 - it depends if it was quick enough to re-establish a new connection or not.
- waitFor(t, connected, time.Minute)
-
- require.NoError(t, listener.Close())
-
- err = testutil.CollectAndCompare(listener, strings.NewReader(`
- # HELP gitaly_praefect_notifications_reconnects_total Counts amount of reconnects to listen for notification from PostgreSQL
- # TYPE gitaly_praefect_notifications_reconnects_total counter
- gitaly_praefect_notifications_reconnects_total{state="connected"} 1
- gitaly_praefect_notifications_reconnects_total{state="disconnected"} 3
- gitaly_praefect_notifications_reconnects_total{state="reconnected"} 3
- `))
- require.NoError(t, err)
- })
-}
-
-func requireEqualNotificationEntries(t *testing.T, d string, entries []notificationEntry) {
- t.Helper()
-
- var nes []notificationEntry
- require.NoError(t, json.NewDecoder(strings.NewReader(d)).Decode(&nes))
-
- for _, es := range [][]notificationEntry{entries, nes} {
- for _, e := range es {
- sort.Strings(e.RelativePaths)
- }
- sort.Slice(es, func(i, j int) bool { return es[i].VirtualStorage < es[j].VirtualStorage })
- }
-
- require.EqualValues(t, entries, nes)
-}
-
-func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
- t.Parallel()
- db := testdb.New(t)
-
- const channel = "repositories_updates"
-
- testListener(
- t,
- db.Name,
- "repositories_updates",
- func(t *testing.T) {
- _, err := db.DB.Exec(`
- INSERT INTO repositories
- VALUES ('praefect-1', '/path/to/repo/1', 1, 1),
- ('praefect-1', '/path/to/repo/2', 1, 2),
- ('praefect-1', '/path/to/repo/3', 0, 3),
- ('praefect-2', '/path/to/repo/1', 1, 4)`)
- require.NoError(t, err)
- },
- func(t *testing.T) {
- _, err := db.DB.Exec(`DELETE FROM repositories WHERE generation > 0`)
- require.NoError(t, err)
- },
- func(t *testing.T, n glsql.Notification) {
- require.Equal(t, channel, n.Channel)
- requireEqualNotificationEntries(t, n.Payload, []notificationEntry{
- {VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo/1", "/path/to/repo/2"}},
- {VirtualStorage: "praefect-2", RelativePaths: []string{"/path/to/repo/1"}},
- })
- },
- )
-}
-
-func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) {
- t.Parallel()
- db := testdb.New(t)
-
- const channel = "storage_repositories_updates"
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- testListener(
- t,
- db.Name,
- channel,
- func(t *testing.T) {
- rs := NewPostgresRepositoryStore(db, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "praefect-1", "/path/to/repo", "replica-path", "primary", nil, nil, true, false))
- },
- func(t *testing.T) {
- _, err := db.DB.Exec(`
- INSERT INTO storage_repositories
- VALUES ('praefect-1', '/path/to/repo', 'gitaly-1', 0, 1),
- ('praefect-1', '/path/to/repo', 'gitaly-2', 0, 1)`,
- )
- require.NoError(t, err)
- },
- func(t *testing.T, n glsql.Notification) {
- require.Equal(t, channel, n.Channel)
- requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}})
- },
- )
-}
-
-func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) {
- t.Parallel()
- db := testdb.New(t)
-
- const channel = "storage_repositories_updates"
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- testListener(
- t,
- db.Name,
- channel,
- func(t *testing.T) {
- rs := NewPostgresRepositoryStore(db, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "praefect-1", "/path/to/repo", "replica-path", "gitaly-1", nil, nil, true, false))
- },
- func(t *testing.T) {
- _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = generation + 1`)
- require.NoError(t, err)
- },
- func(t *testing.T, n glsql.Notification) {
- require.Equal(t, channel, n.Channel)
- requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}})
- },
- )
-}
-
-func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) {
- t.Parallel()
- db := testdb.New(t)
-
- const channel = "storage_repositories_updates"
-
- testListener(
- t,
- db.Name,
- channel,
- func(t *testing.T) {},
- func(t *testing.T) {
- _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = 1`)
- require.NoError(t, err)
- },
- nil, // no notification events expected
- )
-}
-
-func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) {
- t.Parallel()
- db := testdb.New(t)
-
- const channel = "storage_repositories_updates"
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- testListener(
- t,
- db.Name,
- channel,
- func(t *testing.T) {
- rs := NewPostgresRepositoryStore(db, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "praefect-1", "/path/to/repo", "replica-path", "gitaly-1", nil, nil, true, false))
- },
- func(t *testing.T) {
- _, err := db.DB.Exec(`DELETE FROM storage_repositories`)
- require.NoError(t, err)
- },
- func(t *testing.T, n glsql.Notification) {
- require.Equal(t, channel, n.Channel)
- requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}})
- },
- )
-}
-
-func testListener(t *testing.T, dbName, channel string, setup func(t *testing.T), trigger func(t *testing.T), verifier func(t *testing.T, notification glsql.Notification)) {
- setup(t)
-
- readyChan := make(chan struct{})
- receivedChan := make(chan struct{})
- var notification glsql.Notification
-
- callback := func(n glsql.Notification) {
- select {
- case <-receivedChan:
- return
- default:
- notification = n
- close(receivedChan)
- }
- }
-
- opts := DefaultPostgresListenerOpts
- opts.Addr = glsql.DSN(testdb.GetConfig(t, dbName), true)
- opts.Channels = []string{channel}
-
- handler := mockListenHandler{OnNotification: callback, OnConnected: func() { close(readyChan) }}
-
- pgl, err := NewPostgresListener(testhelper.NewDiscardingLogger(t), opts, handler)
- require.NoError(t, err)
- defer pgl.Close()
-
- select {
- case <-time.After(time.Second):
- require.FailNow(t, "no connection for too long period")
- case <-readyChan:
- }
-
- trigger(t)
-
- select {
- case <-time.After(time.Second):
- if verifier == nil {
- // no notifications expected
- return
- }
- require.FailNow(t, "no notifications for too long period")
- case <-receivedChan:
- }
-
- if verifier == nil {
- require.Failf(t, "no notifications expected", "received: %v", notification)
- }
- verifier(t, notification)
-}
diff --git a/internal/praefect/datastore/listener_test.go b/internal/praefect/datastore/listener_test.go
index 21a8b3d6c..24ea9df6d 100644
--- a/internal/praefect/datastore/listener_test.go
+++ b/internal/praefect/datastore/listener_test.go
@@ -2,8 +2,10 @@ package datastore
import (
"context"
+ "encoding/json"
"errors"
"fmt"
+ "sort"
"strings"
"testing"
"time"
@@ -452,3 +454,43 @@ func verifyListener(t *testing.T, ctx context.Context, dbConf config.DB, channel
verifier(t, notification)
waitFor(t, done)
}
+
+func requireEqualNotificationEntries(t *testing.T, d string, entries []notificationEntry) {
+ t.Helper()
+
+ var nes []notificationEntry
+ require.NoError(t, json.NewDecoder(strings.NewReader(d)).Decode(&nes))
+
+ for _, es := range [][]notificationEntry{entries, nes} {
+ for _, e := range es {
+ sort.Strings(e.RelativePaths)
+ }
+ sort.Slice(es, func(i, j int) bool { return es[i].VirtualStorage < es[j].VirtualStorage })
+ }
+
+ require.EqualValues(t, entries, nes)
+}
+
+type mockListenHandler struct {
+ OnNotification func(glsql.Notification)
+ OnDisconnect func(error)
+ OnConnected func()
+}
+
+func (mlh mockListenHandler) Notification(n glsql.Notification) {
+ if mlh.OnNotification != nil {
+ mlh.OnNotification(n)
+ }
+}
+
+func (mlh mockListenHandler) Disconnect(err error) {
+ if mlh.OnDisconnect != nil {
+ mlh.OnDisconnect(err)
+ }
+}
+
+func (mlh mockListenHandler) Connected() {
+ if mlh.OnConnected != nil {
+ mlh.OnConnected()
+ }
+}