diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-02-02 21:51:29 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-02-03 14:17:20 +0300 |
commit | 5d63cf350e5a91f841e17bb11cc34b79d52b9373 (patch) | |
tree | ee05c0b260944ba3c0caf4c870ce112c792c68a1 | |
parent | b3e961e4f85ac1af2b7347db48b00f9e6c617045 (diff) |
support default replication factor in coordinator
This commit adds support for default replication factor in
Praefect's coordinator and hooks up the configuration option with
the rest of the code.
When a default replication factor is configured for a virtual storage
and repository specific primaries are enabled, the coordinator stores
the nodes participating in the repository's creation as the assigned
nodes.
-rw-r--r-- | cmd/praefect/main.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 8 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 42 |
3 files changed, 38 insertions, 14 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index b59dc560c..fb1f553a1 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -338,7 +338,7 @@ func run(cfgs []starter.Config, conf config.Config) error { praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), rs, assignmentStore, - map[string]int{}, + conf.DefaultReplicationFactors(), ) } else { healthChecker = praefect.HealthChecker(nodeManager) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 011e4b0a5..930993490 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -763,13 +763,17 @@ func (c *Coordinator) newRequestFinalizer( ctxlogrus.Extract(ctx).WithError(err).Info("deleted repository does not have a store entry") } case datastore.CreateRepo: + repositorySpecificPrimariesEnabled := c.conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository + variableReplicationFactorEnabled := repositorySpecificPrimariesEnabled && + c.conf.DefaultReplicationFactors()[virtualStorage] > 0 + if err := c.rs.CreateRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary, append(updatedSecondaries, outdatedSecondaries...), - c.conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository, - false, + repositorySpecificPrimariesEnabled, + variableReplicationFactorEnabled, ); err != nil { if !errors.Is(err, datastore.RepositoryExistsError{}) { return fmt.Errorf("create repository: %w", err) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 7c78b7ea6..844554328 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -596,19 +596,34 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { func TestStreamDirector_repo_creation(t *testing.T) { for _, tc := range []struct { - electionStrategy config.ElectionStrategy - primaryStored bool + desc string + electionStrategy config.ElectionStrategy + replicationFactor int + primaryStored bool + assignmentsStored bool }{ { - electionStrategy: config.ElectionStrategySQL, - primaryStored: false, + desc: "virtual storage scoped primaries", + electionStrategy: config.ElectionStrategySQL, + replicationFactor: 3, // assignments are not set when not using repository specific primaries + primaryStored: false, + assignmentsStored: false, }, { - electionStrategy: config.ElectionStrategyPerRepository, - primaryStored: true, + desc: "repository specific primaries without variable replication factor", + electionStrategy: config.ElectionStrategyPerRepository, + primaryStored: true, + assignmentsStored: false, + }, + { + desc: "repository specific primaries with variable replication factor", + electionStrategy: config.ElectionStrategyPerRepository, + replicationFactor: 3, + primaryStored: true, + assignmentsStored: true, }, } { - t.Run(string(tc.electionStrategy), func(t *testing.T) { + t.Run(tc.desc, func(t *testing.T) { primaryNode := &config.Node{Storage: "praefect-internal-1"} healthySecondaryNode := &config.Node{Storage: "praefect-internal-2"} unhealthySecondaryNode := &config.Node{Storage: "praefect-internal-3"} @@ -616,8 +631,9 @@ func TestStreamDirector_repo_creation(t *testing.T) { Failover: config.Failover{ElectionStrategy: tc.electionStrategy}, VirtualStorages: []*config.VirtualStorage{ &config.VirtualStorage{ - Name: "praefect", - Nodes: []*config.Node{primaryNode, healthySecondaryNode, unhealthySecondaryNode}, + Name: "praefect", + DefaultReplicationFactor: tc.replicationFactor, + Nodes: []*config.Node{primaryNode, healthySecondaryNode, unhealthySecondaryNode}, }, }, } @@ -642,8 +658,9 @@ func TestStreamDirector_repo_creation(t *testing.T) { assert.Equal(t, targetRepo.StorageName, virtualStorage) assert.Equal(t, targetRepo.RelativePath, relativePath) assert.Equal(t, rewrittenStorage, primary) + assert.ElementsMatch(t, []string{healthySecondaryNode.Storage, unhealthySecondaryNode.Storage}, secondaries) assert.Equal(t, tc.primaryStored, storePrimary) - assert.False(t, storeAssignments) + assert.Equal(t, tc.assignmentsStored, storeAssignments) return nil }, } @@ -697,10 +714,13 @@ func TestStreamDirector_repo_creation(t *testing.T) { require.Equal(t, n, 2, "number of primary candidates should match the number of healthy nodes") return 0 }, + shuffleFunc: func(n int, swap func(int, int)) { + require.Equal(t, n, 2, "number of secondary candiates should match the number of node minus the primary") + }, }, nil, nil, - map[string]int{}, + conf.DefaultReplicationFactors(), ) default: t.Fatalf("unexpected election strategy: %q", tc.electionStrategy) |