From fa91df189520e91c5006594e5ef83397c1b4c635 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 16 Nov 2021 18:59:31 +0200 Subject: fix concurrency testing --- internal/praefect/datastore/glsql/testing.go | 30 ++++++++++++++++++++++++++ internal/praefect/nodes/per_repository.go | 9 ++++++-- internal/praefect/nodes/per_repository_test.go | 27 +++++++++++++++-------- 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index e1f8145e1..8c89babdf 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -1,6 +1,7 @@ package glsql import ( + "context" "database/sql" "errors" "os" @@ -9,6 +10,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" @@ -201,3 +203,31 @@ func getEnvFromGDK(t testing.TB) { require.NoError(t, os.Setenv(key, value), "set env var %v", key) } } + +// WaitForQueries is a helper that waits until a certain number of queries matching the prefix are present in the +// database. This is useful for ensuring multiple transactions are executing the query when testing concurrent +// execution. +func WaitForQueries(ctx context.Context, t testing.TB, db Querier, queryPrefix string, count int) { + t.Helper() + + for { + var queriesPresent bool + require.NoError(t, db.QueryRowContext(ctx, ` + SELECT COUNT(*) = $2 + FROM pg_stat_activity + WHERE TRIM(e'\n' FROM query) LIKE $1 + `, queryPrefix+"%", count).Scan(&queriesPresent)) + + if queriesPresent { + return + } + + retry := time.NewTimer(time.Millisecond) + select { + case <-ctx.Done(): + retry.Stop() + return + case <-retry.C: + } + } +} diff --git a/internal/praefect/nodes/per_repository.go b/internal/praefect/nodes/per_repository.go index 92342b66f..a61f5342a 100644 --- a/internal/praefect/nodes/per_repository.go +++ b/internal/praefect/nodes/per_repository.go @@ -87,7 +87,11 @@ func (pr *PerRepositoryElector) Run(ctx context.Context, trigger <-chan struct{} func (pr *PerRepositoryElector) performFailovers(ctx context.Context) error { rows, err := pr.db.QueryContext(ctx, ` -WITH updated AS ( +WITH election_lock AS ( + SELECT pg_try_advisory_xact_lock(1020) AS acquired +), + +updated AS ( UPDATE repositories SET "primary" = ( SELECT storage @@ -97,12 +101,13 @@ WITH updated AS ( ORDER BY random() LIMIT 1 ) + FROM election_lock WHERE NOT EXISTS ( SELECT FROM valid_primaries WHERE virtual_storage = repositories.virtual_storage AND relative_path = repositories.relative_path AND storage = repositories."primary" - ) + ) AND acquired RETURNING virtual_storage, relative_path, "primary" ), diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index 021a6465e..e6ea808af 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -1,3 +1,4 @@ +//go:build postgres // +build postgres package nodes @@ -6,6 +7,7 @@ import ( "context" "database/sql" "errors" + "sync" "testing" "time" @@ -617,9 +619,8 @@ func TestPerRepositoryElector(t *testing.T) { } for _, step := range tc.steps { + var wg sync.WaitGroup runElection := func(tx *sql.Tx, matchLogs logMatcher) { - setHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect-0": step.healthyNodes}) - // The first transaction runs first logger, hook := test.NewNullLogger() elector := NewPerRepositoryElector(logrus.NewEntry(logger), tx) @@ -631,14 +632,12 @@ func TestPerRepositoryElector(t *testing.T) { require.NoError(t, elector.Run(ctx, trigger)) - primary, err := elector.GetPrimary(ctx, "virtual-storage-1", "relative-path-1") - assert.Equal(t, step.error, err) - step.primary(t, primary) - require.Len(t, hook.Entries, 3) matchLogs(t, hook.Entries[1]) } + setHealthyNodes(t, ctx, db, map[string]map[string][]string{"praefect-0": step.healthyNodes}) + // Run every step with two concurrent transactions to ensure two Praefect's running // election at the same time do not elect the primary multiple times. We begin both // transactions at the same time to ensure they have the same snapshot of the @@ -656,13 +655,23 @@ func TestPerRepositoryElector(t *testing.T) { runElection(txFirst, step.matchLogs) - require.NoError(t, txFirst.Commit()) - + wg.Add(1) // Run the second election on the same database snapshot. This should result in no changes. // Running this prior to the first transaction committing would block. - runElection(txSecond, noChanges) + go func() { + defer wg.Done() + runElection(txSecond, noChanges) + }() + + glsql.WaitForQueries(ctx, t, db, "WITH election_lock AS", 2) + require.NoError(t, txFirst.Commit()) + wg.Wait() require.NoError(t, txSecond.Commit()) + + primary, err := NewPerRepositoryElector(testhelper.DiscardTestLogger(t), db).GetPrimary(ctx, "virtual-storage-1", "relative-path-1") + assert.Equal(t, step.error, err) + step.primary(t, primary) } }) } -- cgit v1.2.3