diff options
Diffstat (limited to 'internal/praefect/nodes/per_repository_test.go')
-rw-r--r-- | internal/praefect/nodes/per_repository_test.go | 56 |
1 files changed, 42 insertions, 14 deletions
diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index 546386fa5..067b132e4 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -502,24 +502,52 @@ func TestPerRepositoryElector(t *testing.T) { } for _, step := range tc.steps { - logger, hook := test.NewNullLogger() - elector := NewPerRepositoryElector(logrus.NewEntry(logger), db, - HealthConsensusFunc(func() map[string][]string { return step.healthyNodes }), - ) - elector.handleError = func(err error) error { return err } + runElection := func(tx *sql.Tx, matchLogs logMatcher) { + // The first transaction runs first + logger, hook := test.NewNullLogger() + elector := NewPerRepositoryElector(logrus.NewEntry(logger), tx, + HealthConsensusFunc(func() map[string][]string { return step.healthyNodes }), + ) + elector.handleError = func(err error) error { return err } + + trigger := make(chan struct{}, 1) + trigger <- struct{}{} + close(trigger) + + require.NoError(t, elector.Run(ctx, trigger)) + + primary, err := elector.GetPrimary(ctx, "virtual-storage-1", "relative-path-1") + assert.Equal(t, step.error, err) + step.primary(t, primary) + + require.Len(t, hook.Entries, 3) + matchLogs(t, hook.Entries[1]) + } + + // Run every step with two concurrent transactions to ensure two Praefect's running + // election at the same time do not elect the primary multiple times. We begin both + // transactions at the same time to ensure they have the same snapshot of the + // database. The second transaction would be blocked until the first transaction commits. + // To verify concurrent election runs do not elect the primary multiple times, we assert + // the second transaction performed no changes and the primary is what the first run elected + // it to be. + txFirst, err := db.Begin() + require.NoError(t, err) + defer txFirst.Rollback() + + txSecond, err := db.Begin() + require.NoError(t, err) + defer txSecond.Rollback() - trigger := make(chan struct{}, 1) - trigger <- struct{}{} - close(trigger) + runElection(txFirst, step.matchLogs) - require.NoError(t, elector.Run(ctx, trigger)) + require.NoError(t, txFirst.Commit()) - primary, err := elector.GetPrimary(ctx, "virtual-storage-1", "relative-path-1") - assert.Equal(t, step.error, err) - step.primary(t, primary) + // Run the second election on the same database snapshot. This should result in no changes. + // Running this prior to the first transaction committing would block. + runElection(txSecond, noChanges) - require.Len(t, hook.Entries, 3) - step.matchLogs(t, hook.Entries[1]) + require.NoError(t, txSecond.Commit()) } }) } |