Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-11-16 19:59:31 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-11-16 19:59:31 +0300
commitfa91df189520e91c5006594e5ef83397c1b4c635 (patch)
tree2d023605e904ced7764d3c59fb489d94678a39fc
parent2da10260b5fd57a80ed318071d8eb090576fcf4f (diff)
fix concurrency testingsmh-14-0-stable-concurrency-test
-rw-r--r--internal/praefect/datastore/glsql/testing.go30
-rw-r--r--internal/praefect/nodes/per_repository.go9
-rw-r--r--internal/praefect/nodes/per_repository_test.go27
3 files changed, 55 insertions, 11 deletions
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index e1f8145e1..8c89babdf 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -1,6 +1,7 @@
package glsql
import (
+ "context"
"database/sql"
"errors"
"os"
@@ -9,6 +10,7 @@ import (
"strings"
"sync"
"testing"
+ "time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
@@ -201,3 +203,31 @@ func getEnvFromGDK(t testing.TB) {
require.NoError(t, os.Setenv(key, value), "set env var %v", key)
}
}
+
+// WaitForQueries is a helper that waits until a certain number of queries matching the prefix are present in the
+// database. This is useful for ensuring multiple transactions are executing the query when testing concurrent
+// execution.
+func WaitForQueries(ctx context.Context, t testing.TB, db Querier, queryPrefix string, count int) {
+ t.Helper()
+
+ for {
+ var queriesPresent bool
+ require.NoError(t, db.QueryRowContext(ctx, `
+ SELECT COUNT(*) = $2
+ FROM pg_stat_activity
+ WHERE TRIM(e'\n' FROM query) LIKE $1
+ `, queryPrefix+"%", count).Scan(&queriesPresent))
+
+ if queriesPresent {
+ return
+ }
+
+ retry := time.NewTimer(time.Millisecond)
+ select {
+ case <-ctx.Done():
+ retry.Stop()
+ return
+ case <-retry.C:
+ }
+ }
+}
diff --git a/internal/praefect/nodes/per_repository.go b/internal/praefect/nodes/per_repository.go
index 92342b66f..a61f5342a 100644
--- a/internal/praefect/nodes/per_repository.go
+++ b/internal/praefect/nodes/per_repository.go
@@ -87,7 +87,11 @@ func (pr *PerRepositoryElector) Run(ctx context.Context, trigger <-chan struct{}
func (pr *PerRepositoryElector) performFailovers(ctx context.Context) error {
rows, err := pr.db.QueryContext(ctx, `
-WITH updated AS (
+WITH election_lock AS (
+ SELECT pg_try_advisory_xact_lock(1020) AS acquired
+),
+
+updated AS (
UPDATE repositories
SET "primary" = (
SELECT storage
@@ -97,12 +101,13 @@ WITH updated AS (
ORDER BY random()
LIMIT 1
)
+ FROM election_lock
WHERE NOT EXISTS (
SELECT FROM valid_primaries
WHERE virtual_storage = repositories.virtual_storage
AND relative_path = repositories.relative_path
AND storage = repositories."primary"
- )
+ ) AND acquired
RETURNING virtual_storage, relative_path, "primary"
),
diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go
index 021a6465e..e6ea808af 100644
--- a/internal/praefect/nodes/per_repository_test.go
+++ b/internal/praefect/nodes/per_repository_test.go
@@ -1,3 +1,4 @@
+//go:build postgres
// +build postgres
package nodes
@@ -6,6 +7,7 @@ import (
"context"
"database/sql"
"errors"
+ "sync"
"testing"
"time"
@@ -617,9 +619,8 @@ func TestPerRepositoryElector(t *testing.T) {
}
for _, step := range tc.steps {
+ var wg sync.WaitGroup
runElection := func(tx *sql.Tx, matchLogs logMatcher) {
- setHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect-0": step.healthyNodes})
-
// The first transaction runs first
logger, hook := test.NewNullLogger()
elector := NewPerRepositoryElector(logrus.NewEntry(logger), tx)
@@ -631,14 +632,12 @@ func TestPerRepositoryElector(t *testing.T) {
require.NoError(t, elector.Run(ctx, trigger))
- primary, err := elector.GetPrimary(ctx, "virtual-storage-1", "relative-path-1")
- assert.Equal(t, step.error, err)
- step.primary(t, primary)
-
require.Len(t, hook.Entries, 3)
matchLogs(t, hook.Entries[1])
}
+ setHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect-0": step.healthyNodes})
+
// Run every step with two concurrent transactions to ensure two Praefect's running
// election at the same time do not elect the primary multiple times. We begin both
// transactions at the same time to ensure they have the same snapshot of the
@@ -656,13 +655,23 @@ func TestPerRepositoryElector(t *testing.T) {
runElection(txFirst, step.matchLogs)
- require.NoError(t, txFirst.Commit())
-
+ wg.Add(1)
// Run the second election on the same database snapshot. This should result in no changes.
// Running this prior to the first transaction committing would block.
- runElection(txSecond, noChanges)
+ go func() {
+ defer wg.Done()
+ runElection(txSecond, noChanges)
+ }()
+
+ glsql.WaitForQueries(ctx, t, db, "WITH election_lock AS", 2)
+ require.NoError(t, txFirst.Commit())
+ wg.Wait()
require.NoError(t, txSecond.Commit())
+
+ primary, err := NewPerRepositoryElector(testhelper.DiscardTestLogger(t), db).GetPrimary(ctx, "virtual-storage-1", "relative-path-1")
+ assert.Equal(t, step.error, err)
+ step.primary(t, primary)
}
})
}