Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-02-02 21:51:29 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-02-03 14:17:20 +0300
commit5d63cf350e5a91f841e17bb11cc34b79d52b9373 (patch)
treeee05c0b260944ba3c0caf4c870ce112c792c68a1
parentb3e961e4f85ac1af2b7347db48b00f9e6c617045 (diff)
support default replication factor in coordinator
This commit adds support for default replication factor in Praefect's coordinator and hooks up the configuration option with the rest of the code. When a default replication factor is configured for a virtual storage and repository specific primaries are enabled, the coordinator stores the nodes participating in the repository's creation as the assigned nodes.
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--internal/praefect/coordinator.go8
-rw-r--r--internal/praefect/coordinator_test.go42
3 files changed, 38 insertions, 14 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index b59dc560c..fb1f553a1 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -338,7 +338,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
rs,
assignmentStore,
- map[string]int{},
+ conf.DefaultReplicationFactors(),
)
} else {
healthChecker = praefect.HealthChecker(nodeManager)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 011e4b0a5..930993490 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -763,13 +763,17 @@ func (c *Coordinator) newRequestFinalizer(
ctxlogrus.Extract(ctx).WithError(err).Info("deleted repository does not have a store entry")
}
case datastore.CreateRepo:
+ repositorySpecificPrimariesEnabled := c.conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository
+ variableReplicationFactorEnabled := repositorySpecificPrimariesEnabled &&
+ c.conf.DefaultReplicationFactors()[virtualStorage] > 0
+
if err := c.rs.CreateRepository(ctx,
virtualStorage,
targetRepo.GetRelativePath(),
primary,
append(updatedSecondaries, outdatedSecondaries...),
- c.conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository,
- false,
+ repositorySpecificPrimariesEnabled,
+ variableReplicationFactorEnabled,
); err != nil {
if !errors.Is(err, datastore.RepositoryExistsError{}) {
return fmt.Errorf("create repository: %w", err)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 7c78b7ea6..844554328 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -596,19 +596,34 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
func TestStreamDirector_repo_creation(t *testing.T) {
for _, tc := range []struct {
- electionStrategy config.ElectionStrategy
- primaryStored bool
+ desc string
+ electionStrategy config.ElectionStrategy
+ replicationFactor int
+ primaryStored bool
+ assignmentsStored bool
}{
{
- electionStrategy: config.ElectionStrategySQL,
- primaryStored: false,
+ desc: "virtual storage scoped primaries",
+ electionStrategy: config.ElectionStrategySQL,
+ replicationFactor: 3, // assignments are not set when not using repository specific primaries
+ primaryStored: false,
+ assignmentsStored: false,
},
{
- electionStrategy: config.ElectionStrategyPerRepository,
- primaryStored: true,
+ desc: "repository specific primaries without variable replication factor",
+ electionStrategy: config.ElectionStrategyPerRepository,
+ primaryStored: true,
+ assignmentsStored: false,
+ },
+ {
+ desc: "repository specific primaries with variable replication factor",
+ electionStrategy: config.ElectionStrategyPerRepository,
+ replicationFactor: 3,
+ primaryStored: true,
+ assignmentsStored: true,
},
} {
- t.Run(string(tc.electionStrategy), func(t *testing.T) {
+ t.Run(tc.desc, func(t *testing.T) {
primaryNode := &config.Node{Storage: "praefect-internal-1"}
healthySecondaryNode := &config.Node{Storage: "praefect-internal-2"}
unhealthySecondaryNode := &config.Node{Storage: "praefect-internal-3"}
@@ -616,8 +631,9 @@ func TestStreamDirector_repo_creation(t *testing.T) {
Failover: config.Failover{ElectionStrategy: tc.electionStrategy},
VirtualStorages: []*config.VirtualStorage{
&config.VirtualStorage{
- Name: "praefect",
- Nodes: []*config.Node{primaryNode, healthySecondaryNode, unhealthySecondaryNode},
+ Name: "praefect",
+ DefaultReplicationFactor: tc.replicationFactor,
+ Nodes: []*config.Node{primaryNode, healthySecondaryNode, unhealthySecondaryNode},
},
},
}
@@ -642,8 +658,9 @@ func TestStreamDirector_repo_creation(t *testing.T) {
assert.Equal(t, targetRepo.StorageName, virtualStorage)
assert.Equal(t, targetRepo.RelativePath, relativePath)
assert.Equal(t, rewrittenStorage, primary)
+ assert.ElementsMatch(t, []string{healthySecondaryNode.Storage, unhealthySecondaryNode.Storage}, secondaries)
assert.Equal(t, tc.primaryStored, storePrimary)
- assert.False(t, storeAssignments)
+ assert.Equal(t, tc.assignmentsStored, storeAssignments)
return nil
},
}
@@ -697,10 +714,13 @@ func TestStreamDirector_repo_creation(t *testing.T) {
require.Equal(t, n, 2, "number of primary candidates should match the number of healthy nodes")
return 0
},
+ shuffleFunc: func(n int, swap func(int, int)) {
+ require.Equal(t, n, 2, "number of secondary candiates should match the number of node minus the primary")
+ },
},
nil,
nil,
- map[string]int{},
+ conf.DefaultReplicationFactors(),
)
default:
t.Fatalf("unexpected election strategy: %q", tc.electionStrategy)