Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2020-12-04 17:08:47 +0300
committerToon Claes <toon@gitlab.com>2020-12-04 17:08:47 +0300
commita6a6ca5098a45507c17d3828fbd309569c19ffdf (patch)
tree58073dc7a124d286e47410bd34c79271837aac72
parent2eb4db13dab06b87382582f5fcddab0c8397463e (diff)
parentc298d3efb7a0a270b0cfc73f51baca6312eb50fe (diff)
Merge branch 'smh-set-replication-factor' into 'master'
Set replication factor for a repository See merge request gitlab-org/gitaly!2851
-rw-r--r--changelogs/unreleased/smh-set-replication-factor.yml5
-rw-r--r--cmd/praefect/main.go8
-rw-r--r--cmd/praefect/subcmd.go17
-rw-r--r--cmd/praefect/subcmd_accept_dataloss_test.go2
-rw-r--r--cmd/praefect/subcmd_dataloss_test.go2
-rw-r--r--cmd/praefect/subcmd_set_replication_factor.go70
-rw-r--r--cmd/praefect/subcmd_set_replication_factor_test.go100
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/datastore/assignment.go129
-rw-r--r--internal/praefect/datastore/assignment_test.go130
-rw-r--r--internal/praefect/helper_test.go2
-rw-r--r--internal/praefect/replicator_test.go2
-rw-r--r--internal/praefect/server.go7
-rw-r--r--internal/praefect/server_factory.go5
-rw-r--r--internal/praefect/server_factory_test.go10
-rw-r--r--internal/praefect/service/info/consistencycheck_test.go2
-rw-r--r--internal/praefect/service/info/replication_factor.go28
-rw-r--r--internal/praefect/service/info/server.go5
-rw-r--r--proto/go/gitalypb/praefect.pb.go279
-rw-r--r--proto/praefect.proto26
-rw-r--r--ruby/proto/gitaly/praefect_pb.rb10
-rw-r--r--ruby/proto/gitaly/praefect_services_pb.rb9
22 files changed, 766 insertions, 84 deletions
diff --git a/changelogs/unreleased/smh-set-replication-factor.yml b/changelogs/unreleased/smh-set-replication-factor.yml
new file mode 100644
index 000000000..990b09392
--- /dev/null
+++ b/changelogs/unreleased/smh-set-replication-factor.yml
@@ -0,0 +1,5 @@
+---
+title: Set replication factor for a repository
+merge_request: 2851
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 9a0ad7485..73dc1ca62 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -108,6 +108,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/praefect/reconciler"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/internal/version"
"gitlab.com/gitlab-org/labkit/monitoring"
@@ -238,6 +239,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
var rs datastore.RepositoryStore
var sp nodes.StorageProvider
var metricsCollectors []prometheus.Collector
+ var replicationFactorSetter info.ReplicationFactorSetter
if conf.MemoryQueueEnabled {
queue = datastore.NewMemoryReplicationEventQueue(conf)
@@ -326,13 +328,16 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
}()
+ assignmentStore := datastore.NewAssignmentStore(db, conf.StorageNames())
+ replicationFactorSetter = assignmentStore
+
router = praefect.NewPerRepositoryRouter(
nodeSet.Connections(),
elector,
hm,
praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
rs,
- datastore.NewAssignmentStore(db, conf.StorageNames()),
+ assignmentStore,
)
} else {
healthChecker = praefect.HealthChecker(nodeManager)
@@ -377,6 +382,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
transactionManager,
queue,
rs,
+ replicationFactorSetter,
protoregistry.GitalyProtoPreregistered,
)
)
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 7b867dc54..9fbff7675 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -24,14 +24,15 @@ type subcmd interface {
var (
subcommands = map[string]subcmd{
- "sql-ping": &sqlPingSubcommand{},
- "sql-migrate": &sqlMigrateSubcommand{},
- "dial-nodes": &dialNodesSubcommand{},
- "reconcile": &reconcileSubcommand{},
- "sql-migrate-down": &sqlMigrateDownSubcommand{},
- "sql-migrate-status": &sqlMigrateStatusSubcommand{},
- "dataloss": newDatalossSubcommand(),
- "accept-dataloss": &acceptDatalossSubcommand{},
+ "sql-ping": &sqlPingSubcommand{},
+ "sql-migrate": &sqlMigrateSubcommand{},
+ "dial-nodes": &dialNodesSubcommand{},
+ "reconcile": &reconcileSubcommand{},
+ "sql-migrate-down": &sqlMigrateDownSubcommand{},
+ "sql-migrate-status": &sqlMigrateStatusSubcommand{},
+ "dataloss": newDatalossSubcommand(),
+ "accept-dataloss": &acceptDatalossSubcommand{},
+ "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
}
)
diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go
index 9d2c7a303..7927df9dc 100644
--- a/cmd/praefect/subcmd_accept_dataloss_test.go
+++ b/cmd/praefect/subcmd_accept_dataloss_test.go
@@ -61,7 +61,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) {
},
}
- ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(nil, conf, q, rs))})
+ ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(nil, conf, q, rs, nil))})
defer clean()
conf.SocketPath = ln.Addr().String()
diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go
index fa5a0533f..9a0b8ddbc 100644
--- a/cmd/praefect/subcmd_dataloss_test.go
+++ b/cmd/praefect/subcmd_dataloss_test.go
@@ -79,7 +79,7 @@ func TestDatalossSubcommand(t *testing.T) {
require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0))
ln, clean := listenAndServe(t, []svcRegistrar{
- registerPraefectInfoServer(info.NewServer(mgr, cfg, nil, gs))})
+ registerPraefectInfoServer(info.NewServer(mgr, cfg, nil, gs, nil))})
defer clean()
for _, tc := range []struct {
desc string
diff --git a/cmd/praefect/subcmd_set_replication_factor.go b/cmd/praefect/subcmd_set_replication_factor.go
new file mode 100644
index 000000000..b179b0153
--- /dev/null
+++ b/cmd/praefect/subcmd_set_replication_factor.go
@@ -0,0 +1,70 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "strings"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+const paramReplicationFactor = "replication-factor"
+
+type setReplicationFactorSubcommand struct {
+ stdout io.Writer
+ virtualStorage string
+ relativePath string
+ replicationFactor int
+}
+
+func newSetReplicatioFactorSubcommand(stdout io.Writer) *setReplicationFactorSubcommand {
+ return &setReplicationFactorSubcommand{stdout: stdout}
+}
+
+func (cmd *setReplicationFactorSubcommand) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet("set-replication-factor", flag.ContinueOnError)
+ fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
+ fs.StringVar(&cmd.relativePath, paramRelativePath, "", "repository to set the replication factor for")
+ fs.IntVar(&cmd.replicationFactor, paramReplicationFactor, -1, "desired replication factor")
+ return fs
+}
+
+func (cmd *setReplicationFactorSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ if flags.NArg() > 0 {
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ } else if cmd.virtualStorage == "" {
+ return requiredParameterError(paramVirtualStorage)
+ } else if cmd.relativePath == "" {
+ return requiredParameterError(paramRelativePath)
+ } else if cmd.replicationFactor < 0 {
+ return requiredParameterError(paramReplicationFactor)
+ }
+
+ nodeAddr, err := getNodeAddress(cfg)
+ if err != nil {
+ return err
+ }
+
+ conn, err := subCmdDial(nodeAddr, cfg.Auth.Token)
+ if err != nil {
+ return fmt.Errorf("error dialing: %w", err)
+ }
+ defer conn.Close()
+
+ client := gitalypb.NewPraefectInfoServiceClient(conn)
+ resp, err := client.SetReplicationFactor(context.TODO(), &gitalypb.SetReplicationFactorRequest{
+ VirtualStorage: cmd.virtualStorage,
+ RelativePath: cmd.relativePath,
+ ReplicationFactor: int32(cmd.replicationFactor),
+ })
+ if err != nil {
+ return err
+ }
+
+ fmt.Fprintf(cmd.stdout, "current assignments: %v", strings.Join(resp.Storages, ", "))
+
+ return nil
+}
diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go
new file mode 100644
index 000000000..b56d566ed
--- /dev/null
+++ b/cmd/praefect/subcmd_set_replication_factor_test.go
@@ -0,0 +1,100 @@
+// +build postgres
+
+package main
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func TestSetReplicationFactorSubcommand(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ args []string
+ error error
+ stdout string
+ }{
+ {
+ desc: "unexpected positional arguments",
+ args: []string{"positonal-arg"},
+ error: unexpectedPositionalArgsError{Command: "set-replication-factor"},
+ },
+ {
+ desc: "missing virtual-storage",
+ args: []string{},
+ error: requiredParameterError("virtual-storage"),
+ },
+ {
+ desc: "missing repository",
+ args: []string{"-virtual-storage=virtual-storage"},
+ error: requiredParameterError("repository"),
+ },
+ {
+ desc: "missing replication-factor",
+ args: []string{"-virtual-storage=virtual-storage", "-repository=relative-path"},
+ error: requiredParameterError("replication-factor"),
+ },
+ {
+ desc: "replication factor too small",
+ args: []string{"-virtual-storage=virtual-storage", "-repository=relative-path", "-replication-factor=0"},
+ error: status.Error(codes.Unknown, "set replication factor: attempted to set replication factor 0 but minimum is 1"),
+ },
+ {
+ desc: "replication factor too big",
+ args: []string{"-virtual-storage=virtual-storage", "-repository=relative-path", "-replication-factor=3"},
+ error: status.Error(codes.Unknown, "set replication factor: attempted to set replication factor 3 but virtual storage only contains 2 storages"),
+ },
+ {
+ desc: "virtual storage not found",
+ args: []string{"-virtual-storage=non-existent", "-repository=relative-path", "-replication-factor=2"},
+ error: status.Error(codes.Unknown, `set replication factor: unknown virtual storage: "non-existent"`),
+ },
+ {
+ desc: "repository not found",
+ args: []string{"-virtual-storage=virtual-storage", "-repository=non-existent", "-replication-factor=2"},
+ error: status.Error(codes.Unknown, `set replication factor: repository "virtual-storage"/"non-existent" not found`),
+ },
+ {
+ desc: "successfully set",
+ args: []string{"-virtual-storage=virtual-storage", "-repository=relative-path", "-replication-factor=2"},
+ stdout: "current assignments: primary, secondary",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := getDB(t)
+
+ store := datastore.NewAssignmentStore(db, map[string][]string{"virtual-storage": {"primary", "secondary"}})
+
+ // create a repository record
+ require.NoError(t,
+ datastore.NewPostgresRepositoryStore(db, nil).SetGeneration(ctx, "virtual-storage", "relative-path", "primary", 0),
+ )
+
+ ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(
+ info.NewServer(nil, config.Config{}, nil, nil, store),
+ )})
+ defer clean()
+
+ stdout := &bytes.Buffer{}
+ cmd := &setReplicationFactorSubcommand{stdout: stdout}
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse(tc.args))
+ err := cmd.Exec(fs, config.Config{
+ SocketPath: ln.Addr().String(),
+ })
+ require.Equal(t, tc.error, err)
+ require.Equal(t, tc.stdout, stdout.String())
+ })
+ }
+}
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 0fceb176d..899b67758 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -178,7 +178,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry)
- srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil)
+ srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
diff --git a/internal/praefect/datastore/assignment.go b/internal/praefect/datastore/assignment.go
index 6bab0a594..77904c50c 100644
--- a/internal/praefect/datastore/assignment.go
+++ b/internal/praefect/datastore/assignment.go
@@ -12,6 +12,18 @@ func newVirtualStorageNotFoundError(virtualStorage string) error {
return fmt.Errorf("virtual storage %q not found", virtualStorage)
}
+func newUnattainableReplicationFactorError(attempted, maximum int) error {
+ return fmt.Errorf("attempted to set replication factor %d but virtual storage only contains %d storages", attempted, maximum)
+}
+
+func newMinimumReplicationFactorError(replicationFactor int) error {
+ return fmt.Errorf("attempted to set replication factor %d but minimum is 1", replicationFactor)
+}
+
+func newRepositoryNotFoundError(virtualStorage, relativePath string) error {
+ return fmt.Errorf("repository %q/%q not found", virtualStorage, relativePath)
+}
+
// AssignmentStore manages host assignments in Postgres.
type AssignmentStore struct {
db glsql.Querier
@@ -61,3 +73,120 @@ AND storage = ANY($3)
return assignedStorages, nil
}
+
+// SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met.
+// Please see the protobuf documentation of the method for details.
+func (s AssignmentStore) SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error) {
+ candidateStorages, ok := s.configuredStorages[virtualStorage]
+ if !ok {
+ return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage)
+ }
+
+ if replicationFactor < 1 {
+ return nil, newMinimumReplicationFactorError(replicationFactor)
+ }
+
+ if max := len(candidateStorages); replicationFactor > max {
+ return nil, newUnattainableReplicationFactorError(replicationFactor, max)
+ }
+
+ // The query works as follows:
+ //
+ // 1. `repository` CTE locks the repository's record for the duration of the update.
+ // This prevents concurrent updates to the `repository_assignments` table for the given
+ // repository. It is not sufficient to rely on row locks in `repository_assignments`
+ // as there might be rows being inserted or deleted in another transaction that
+ // our transaction does not lock. This could be the case if the replication factor
+ // is being increased concurrently from two different nodes and they assign different
+ // storages.
+ //
+ // 2. `existing_assignments` CTE gets the existing assignments for the repository. While
+ // there may be assignments in the database for storage nodes that were removed from the
+ // cluster, the query filters them out.
+ //
+ // 3. `created_assignments` CTE assigns new hosts to the repository if the replication
+ // factor has been increased. Random storages which are not yet assigned to the repository
+ // are picked until the replication factor is met. The primary of a repository is always
+ // assigned first.
+ //
+ // 4. `removed_assignments` CTE removes host assignments if the replication factor has been
+ // decreased. Primary is never removed as it needs a copy of the repository in order to
+ // accept writes. Random hosts are removed until the replication factor is met.
+ //
+ // 6. Finally we return the current set of assignments. CTE updates are not visible in the
+ // tables during the transaction. To account for that, we filter out removed assignments
+ // from the existing assignments. If the replication factor was increased, we'll include the
+ // created assignments. If the replication factor did not change, the query returns the
+ // current assignments.
+ rows, err := s.db.QueryContext(ctx, `
+WITH repository AS (
+ SELECT virtual_storage, relative_path, "primary"
+ FROM repositories
+ WHERE virtual_storage = $1
+ AND relative_path = $2
+ FOR UPDATE
+),
+
+existing_assignments AS (
+ SELECT storage
+ FROM repository
+ JOIN repository_assignments USING (virtual_storage, relative_path)
+ WHERE storage = ANY($4::text[])
+),
+
+created_assignments AS (
+ INSERT INTO repository_assignments
+ SELECT virtual_storage, relative_path, storage
+ FROM repository
+ CROSS JOIN ( SELECT unnest($4::text[]) AS storage ) AS configured_storages
+ WHERE storage NOT IN ( SELECT storage FROM existing_assignments )
+ ORDER BY CASE WHEN storage = "primary" THEN 1 ELSE 0 END DESC, random()
+ LIMIT ( SELECT GREATEST(COUNT(*), $3) - COUNT(*) FROM existing_assignments )
+ RETURNING storage
+),
+
+removed_assignments AS (
+ DELETE FROM repository_assignments
+ USING (
+ SELECT virtual_storage, relative_path, storage
+ FROM repository, existing_assignments
+ WHERE storage != "primary"
+ ORDER BY random()
+ LIMIT ( SELECT COUNT(*) - LEAST(COUNT(*), $3) FROM existing_assignments )
+ ) AS removals
+ WHERE repository_assignments.virtual_storage = removals.virtual_storage
+ AND repository_assignments.relative_path = removals.relative_path
+ AND repository_assignments.storage = removals.storage
+ RETURNING removals.storage
+)
+
+SELECT storage
+FROM existing_assignments
+WHERE storage NOT IN ( SELECT storage FROM removed_assignments )
+UNION
+SELECT storage
+FROM created_assignments
+ORDER BY storage
+ `, virtualStorage, relativePath, replicationFactor, pq.StringArray(candidateStorages))
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+
+ defer rows.Close()
+
+ var storages []string
+ for rows.Next() {
+ var storage string
+ if err := rows.Scan(&storage); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+
+ storages = append(storages, storage)
+ }
+
+ if len(storages) == 0 {
+ return nil, newRepositoryNotFoundError(virtualStorage, relativePath)
+ }
+
+ return storages, rows.Err()
+}
diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go
index 5baeda66d..96f9add51 100644
--- a/internal/praefect/datastore/assignment_test.go
+++ b/internal/praefect/datastore/assignment_test.go
@@ -98,3 +98,133 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) {
})
}
}
+
+func TestAssignmentStore_SetReplicationFactor(t *testing.T) {
+ type matcher func(testing.TB, []string)
+
+ equal := func(expected []string) matcher {
+ return func(t testing.TB, actual []string) {
+ t.Helper()
+ require.Equal(t, expected, actual)
+ }
+ }
+
+ contains := func(expecteds ...[]string) matcher {
+ return func(t testing.TB, actual []string) {
+ t.Helper()
+ require.Contains(t, expecteds, actual)
+ }
+ }
+
+ for _, tc := range []struct {
+ desc string
+ existingAssignments []string
+ nonExistentRepository bool
+ replicationFactor int
+ requireStorages matcher
+ error error
+ }{
+ {
+ desc: "increase replication factor of non-existent repository",
+ nonExistentRepository: true,
+ replicationFactor: 1,
+ error: newRepositoryNotFoundError("virtual-storage", "relative-path"),
+ },
+ {
+ desc: "primary prioritized when setting the first assignments",
+ replicationFactor: 1,
+ requireStorages: equal([]string{"primary"}),
+ },
+ {
+ desc: "increasing replication factor ignores unconfigured storages",
+ existingAssignments: []string{"unconfigured-storage"},
+ replicationFactor: 1,
+ requireStorages: equal([]string{"primary"}),
+ },
+ {
+ desc: "replication factor already achieved",
+ existingAssignments: []string{"primary", "secondary-1"},
+ replicationFactor: 2,
+ requireStorages: equal([]string{"primary", "secondary-1"}),
+ },
+ {
+ desc: "increase replication factor by a step",
+ existingAssignments: []string{"primary"},
+ replicationFactor: 2,
+ requireStorages: contains([]string{"primary", "secondary-1"}, []string{"primary", "secondary-2"}),
+ },
+ {
+ desc: "increase replication factor to maximum",
+ existingAssignments: []string{"primary"},
+ replicationFactor: 3,
+ requireStorages: equal([]string{"primary", "secondary-1", "secondary-2"}),
+ },
+ {
+ desc: "increased replication factor unattainable",
+ existingAssignments: []string{"primary"},
+ replicationFactor: 4,
+ error: newUnattainableReplicationFactorError(4, 3),
+ },
+ {
+ desc: "decreasing replication factor ignores unconfigured storages",
+ existingAssignments: []string{"secondary-1", "unconfigured-storage"},
+ replicationFactor: 1,
+ requireStorages: equal([]string{"secondary-1"}),
+ },
+ {
+ desc: "decrease replication factor by a step",
+ existingAssignments: []string{"primary", "secondary-1", "secondary-2"},
+ replicationFactor: 2,
+ requireStorages: contains([]string{"primary", "secondary-1"}, []string{"primary", "secondary-2"}),
+ },
+ {
+ desc: "decrease replication factor to minimum",
+ existingAssignments: []string{"primary", "secondary-1", "secondary-2"},
+ replicationFactor: 1,
+ requireStorages: equal([]string{"primary"}),
+ },
+ {
+ desc: "minimum replication factor is enforced",
+ replicationFactor: 0,
+ error: newMinimumReplicationFactorError(0),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := getDB(t)
+
+ configuredStorages := map[string][]string{"virtual-storage": {"primary", "secondary-1", "secondary-2"}}
+
+ if !tc.nonExistentRepository {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO repositories (virtual_storage, relative_path, "primary")
+ VALUES ('virtual-storage', 'relative-path', 'primary')
+ `)
+ require.NoError(t, err)
+ }
+
+ for _, storage := range tc.existingAssignments {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1)
+ `, storage)
+ require.NoError(t, err)
+ }
+
+ store := NewAssignmentStore(db, configuredStorages)
+
+ setStorages, err := store.SetReplicationFactor(ctx, "virtual-storage", "relative-path", tc.replicationFactor)
+ require.Equal(t, tc.error, err)
+ if tc.error != nil {
+ return
+ }
+
+ tc.requireStorages(t, setStorages)
+
+ assignedStorages, err := store.GetHostAssignments(ctx, "virtual-storage", "relative-path")
+ require.NoError(t, err)
+ tc.requireStorages(t, assignedStorages)
+ })
+ }
+}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index b742ca199..d469003c2 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -242,7 +242,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
NodeSetFromNodeManager(opt.withNodeMgr),
)
- prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, rs)
+ prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, rs, nil)
listener, port := listenAvailPort(t)
t.Logf("proxy listening on port %d", port)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index f2aa9f03d..a50bee79c 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -363,7 +363,7 @@ func TestPropagateReplicationJob(t *testing.T) {
replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr))
- prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs)
+ prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil)
listener, port := listenAvailPort(t)
ctx, cancel := testhelper.Context()
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index ea263b602..e82bd7826 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -46,6 +46,7 @@ func NewGRPCServer(
txMgr *transactions.Manager,
queue datastore.ReplicationEventQueue,
rs datastore.RepositoryStore,
+ rfs info.ReplicationFactorSetter,
grpcOpts ...grpc.ServerOption,
) *grpc.Server {
ctxTagOpts := []grpc_ctxtags.Option{
@@ -93,7 +94,7 @@ func NewGRPCServer(
warnDupeAddrs(logger, conf)
srv := grpc.NewServer(grpcOpts...)
- registerServices(srv, nodeMgr, txMgr, conf, queue, rs)
+ registerServices(srv, nodeMgr, txMgr, conf, queue, rs, rfs)
return srv
}
@@ -105,10 +106,10 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
}
// registerServices registers services praefect needs to handle RPCs on its own.
-func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore) {
+func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, rfs info.ReplicationFactorSetter) {
// ServerServiceServer is necessary for the ServerInfo RPC
gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, nm))
- gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, rs))
+ gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, rs, rfs))
gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm))
healthpb.RegisterHealthServer(srv, health.NewServer())
diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go
index 568997089..79cb25699 100644
--- a/internal/praefect/server_factory.go
+++ b/internal/praefect/server_factory.go
@@ -12,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@@ -26,6 +27,7 @@ func NewServerFactory(
txMgr *transactions.Manager,
queue datastore.ReplicationEventQueue,
rs datastore.RepositoryStore,
+ rfs info.ReplicationFactorSetter,
registry *protoregistry.Registry,
) *ServerFactory {
return &ServerFactory{
@@ -36,6 +38,7 @@ func NewServerFactory(
txMgr: txMgr,
queue: queue,
rs: rs,
+ rfs: rfs,
registry: registry,
}
}
@@ -50,6 +53,7 @@ type ServerFactory struct {
txMgr *transactions.Manager
queue datastore.ReplicationEventQueue
rs datastore.RepositoryStore
+ rfs info.ReplicationFactorSetter
registry *protoregistry.Registry
secure, insecure []*grpc.Server
}
@@ -116,6 +120,7 @@ func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server {
s.txMgr,
s.queue,
s.rs,
+ s.rfs,
grpcOpts...,
)
}
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index b0fb63e06..d1601a566 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -117,7 +117,7 @@ func TestServerFactory(t *testing.T) {
}
t.Run("insecure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, "localhost:0")
@@ -146,7 +146,7 @@ func TestServerFactory(t *testing.T) {
})
t.Run("secure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, "localhost:0")
@@ -185,7 +185,7 @@ func TestServerFactory(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
defer praefectServerFactory.Stop()
// start with tcp address
@@ -253,7 +253,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls key path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.KeyPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
@@ -262,7 +262,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls cert path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.CertPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
diff --git a/internal/praefect/service/info/consistencycheck_test.go b/internal/praefect/service/info/consistencycheck_test.go
index ee1b2a280..1044ae915 100644
--- a/internal/praefect/service/info/consistencycheck_test.go
+++ b/internal/praefect/service/info/consistencycheck_test.go
@@ -115,7 +115,7 @@ func TestServer_ConsistencyCheck(t *testing.T) {
grpcSrv := grpc.NewServer()
defer grpcSrv.Stop()
- gitalypb.RegisterPraefectInfoServiceServer(grpcSrv, NewServer(nm, conf, queue, nil))
+ gitalypb.RegisterPraefectInfoServiceServer(grpcSrv, NewServer(nm, conf, queue, nil, nil))
go grpcSrv.Serve(praefectListener)
infoConn, err := client.Dial("unix://"+praefectAddr, nil)
diff --git a/internal/praefect/service/info/replication_factor.go b/internal/praefect/service/info/replication_factor.go
new file mode 100644
index 000000000..c38a2cb68
--- /dev/null
+++ b/internal/praefect/service/info/replication_factor.go
@@ -0,0 +1,28 @@
+package info
+
+import (
+ "context"
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+// ReplicationFactorSetter sets a repository's replication factor
+type ReplicationFactorSetter interface {
+ // SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met.
+ // Please see the protobuf documentation of the method for details.
+ SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
+}
+
+func (s *Server) SetReplicationFactor(ctx context.Context, req *gitalypb.SetReplicationFactorRequest) (*gitalypb.SetReplicationFactorResponse, error) {
+ if s.rfs == nil {
+ return nil, fmt.Errorf("setting replication factor is only possible when Praefect is ran with 'per_repository' elector")
+ }
+
+ storages, err := s.rfs.SetReplicationFactor(ctx, req.VirtualStorage, req.RelativePath, int(req.ReplicationFactor))
+ if err != nil {
+ return nil, fmt.Errorf("set replication factor: %w", err)
+ }
+
+ return &gitalypb.SetReplicationFactorResponse{Storages: storages}, nil
+}
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index 9a812a4d1..9cb67efa3 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -16,15 +16,18 @@ type Server struct {
conf config.Config
queue datastore.ReplicationEventQueue
rs datastore.RepositoryStore
+
+ rfs ReplicationFactorSetter
}
// NewServer creates a new instance of a grpc InfoServiceServer
-func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore) gitalypb.PraefectInfoServiceServer {
+func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, rfs ReplicationFactorSetter) gitalypb.PraefectInfoServiceServer {
return &Server{
nodeMgr: nodeMgr,
conf: conf,
queue: queue,
rs: rs,
+ rfs: rfs,
}
}
diff --git a/proto/go/gitalypb/praefect.pb.go b/proto/go/gitalypb/praefect.pb.go
index d796800d4..f47e84d3e 100644
--- a/proto/go/gitalypb/praefect.pb.go
+++ b/proto/go/gitalypb/praefect.pb.go
@@ -24,6 +24,106 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+// SetReplicationFactorRequest sets the desired replication factor for a repository.
+type SetReplicationFactorRequest struct {
+ // virtual_storage is the virtual storage the repository is located in
+ VirtualStorage string `protobuf:"bytes,1,opt,name=virtual_storage,json=virtualStorage,proto3" json:"virtual_storage,omitempty"`
+ // relative_path is the relative path of the repository
+ RelativePath string `protobuf:"bytes,2,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"`
+ // replication_factor is the desired replication factor. Replication must be equal or greater than 1.
+ ReplicationFactor int32 `protobuf:"varint,3,opt,name=replication_factor,json=replicationFactor,proto3" json:"replication_factor,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *SetReplicationFactorRequest) Reset() { *m = SetReplicationFactorRequest{} }
+func (m *SetReplicationFactorRequest) String() string { return proto.CompactTextString(m) }
+func (*SetReplicationFactorRequest) ProtoMessage() {}
+func (*SetReplicationFactorRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_d32bf44842ead735, []int{0}
+}
+
+func (m *SetReplicationFactorRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_SetReplicationFactorRequest.Unmarshal(m, b)
+}
+func (m *SetReplicationFactorRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_SetReplicationFactorRequest.Marshal(b, m, deterministic)
+}
+func (m *SetReplicationFactorRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_SetReplicationFactorRequest.Merge(m, src)
+}
+func (m *SetReplicationFactorRequest) XXX_Size() int {
+ return xxx_messageInfo_SetReplicationFactorRequest.Size(m)
+}
+func (m *SetReplicationFactorRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_SetReplicationFactorRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_SetReplicationFactorRequest proto.InternalMessageInfo
+
+func (m *SetReplicationFactorRequest) GetVirtualStorage() string {
+ if m != nil {
+ return m.VirtualStorage
+ }
+ return ""
+}
+
+func (m *SetReplicationFactorRequest) GetRelativePath() string {
+ if m != nil {
+ return m.RelativePath
+ }
+ return ""
+}
+
+func (m *SetReplicationFactorRequest) GetReplicationFactor() int32 {
+ if m != nil {
+ return m.ReplicationFactor
+ }
+ return 0
+}
+
+// SetReplicationFactorResponse returns the assigned hosts after setting the desired replication factor.
+type SetReplicationFactorResponse struct {
+ // storages are the storages assigned to host the repository.
+ Storages []string `protobuf:"bytes,1,rep,name=storages,proto3" json:"storages,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *SetReplicationFactorResponse) Reset() { *m = SetReplicationFactorResponse{} }
+func (m *SetReplicationFactorResponse) String() string { return proto.CompactTextString(m) }
+func (*SetReplicationFactorResponse) ProtoMessage() {}
+func (*SetReplicationFactorResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_d32bf44842ead735, []int{1}
+}
+
+func (m *SetReplicationFactorResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_SetReplicationFactorResponse.Unmarshal(m, b)
+}
+func (m *SetReplicationFactorResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_SetReplicationFactorResponse.Marshal(b, m, deterministic)
+}
+func (m *SetReplicationFactorResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_SetReplicationFactorResponse.Merge(m, src)
+}
+func (m *SetReplicationFactorResponse) XXX_Size() int {
+ return xxx_messageInfo_SetReplicationFactorResponse.Size(m)
+}
+func (m *SetReplicationFactorResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_SetReplicationFactorResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_SetReplicationFactorResponse proto.InternalMessageInfo
+
+func (m *SetReplicationFactorResponse) GetStorages() []string {
+ if m != nil {
+ return m.Storages
+ }
+ return nil
+}
+
type SetAuthoritativeStorageRequest struct {
VirtualStorage string `protobuf:"bytes,1,opt,name=virtual_storage,json=virtualStorage,proto3" json:"virtual_storage,omitempty"`
RelativePath string `protobuf:"bytes,2,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"`
@@ -37,7 +137,7 @@ func (m *SetAuthoritativeStorageRequest) Reset() { *m = SetAuthoritative
func (m *SetAuthoritativeStorageRequest) String() string { return proto.CompactTextString(m) }
func (*SetAuthoritativeStorageRequest) ProtoMessage() {}
func (*SetAuthoritativeStorageRequest) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{0}
+ return fileDescriptor_d32bf44842ead735, []int{2}
}
func (m *SetAuthoritativeStorageRequest) XXX_Unmarshal(b []byte) error {
@@ -89,7 +189,7 @@ func (m *SetAuthoritativeStorageResponse) Reset() { *m = SetAuthoritativ
func (m *SetAuthoritativeStorageResponse) String() string { return proto.CompactTextString(m) }
func (*SetAuthoritativeStorageResponse) ProtoMessage() {}
func (*SetAuthoritativeStorageResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{1}
+ return fileDescriptor_d32bf44842ead735, []int{3}
}
func (m *SetAuthoritativeStorageResponse) XXX_Unmarshal(b []byte) error {
@@ -126,7 +226,7 @@ func (m *DatalossCheckRequest) Reset() { *m = DatalossCheckRequest{} }
func (m *DatalossCheckRequest) String() string { return proto.CompactTextString(m) }
func (*DatalossCheckRequest) ProtoMessage() {}
func (*DatalossCheckRequest) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{2}
+ return fileDescriptor_d32bf44842ead735, []int{4}
}
func (m *DatalossCheckRequest) XXX_Unmarshal(b []byte) error {
@@ -173,7 +273,7 @@ func (m *DatalossCheckResponse) Reset() { *m = DatalossCheckResponse{} }
func (m *DatalossCheckResponse) String() string { return proto.CompactTextString(m) }
func (*DatalossCheckResponse) ProtoMessage() {}
func (*DatalossCheckResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{3}
+ return fileDescriptor_d32bf44842ead735, []int{5}
}
func (m *DatalossCheckResponse) XXX_Unmarshal(b []byte) error {
@@ -219,7 +319,7 @@ func (m *DatalossCheckResponse_Repository) Reset() { *m = DatalossCheckR
func (m *DatalossCheckResponse_Repository) String() string { return proto.CompactTextString(m) }
func (*DatalossCheckResponse_Repository) ProtoMessage() {}
func (*DatalossCheckResponse_Repository) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{3, 0}
+ return fileDescriptor_d32bf44842ead735, []int{5, 0}
}
func (m *DatalossCheckResponse_Repository) XXX_Unmarshal(b []byte) error {
@@ -286,7 +386,7 @@ func (m *DatalossCheckResponse_Repository_Storage) Reset() {
func (m *DatalossCheckResponse_Repository_Storage) String() string { return proto.CompactTextString(m) }
func (*DatalossCheckResponse_Repository_Storage) ProtoMessage() {}
func (*DatalossCheckResponse_Repository_Storage) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{3, 0, 0}
+ return fileDescriptor_d32bf44842ead735, []int{5, 0, 0}
}
func (m *DatalossCheckResponse_Repository_Storage) XXX_Unmarshal(b []byte) error {
@@ -339,7 +439,7 @@ func (m *RepositoryReplicasRequest) Reset() { *m = RepositoryReplicasReq
func (m *RepositoryReplicasRequest) String() string { return proto.CompactTextString(m) }
func (*RepositoryReplicasRequest) ProtoMessage() {}
func (*RepositoryReplicasRequest) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{4}
+ return fileDescriptor_d32bf44842ead735, []int{6}
}
func (m *RepositoryReplicasRequest) XXX_Unmarshal(b []byte) error {
@@ -379,7 +479,7 @@ func (m *RepositoryReplicasResponse) Reset() { *m = RepositoryReplicasRe
func (m *RepositoryReplicasResponse) String() string { return proto.CompactTextString(m) }
func (*RepositoryReplicasResponse) ProtoMessage() {}
func (*RepositoryReplicasResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{5}
+ return fileDescriptor_d32bf44842ead735, []int{7}
}
func (m *RepositoryReplicasResponse) XXX_Unmarshal(b []byte) error {
@@ -430,7 +530,7 @@ func (m *RepositoryReplicasResponse_RepositoryDetails) String() string {
}
func (*RepositoryReplicasResponse_RepositoryDetails) ProtoMessage() {}
func (*RepositoryReplicasResponse_RepositoryDetails) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{5, 0}
+ return fileDescriptor_d32bf44842ead735, []int{7, 0}
}
func (m *RepositoryReplicasResponse_RepositoryDetails) XXX_Unmarshal(b []byte) error {
@@ -486,7 +586,7 @@ func (m *ConsistencyCheckRequest) Reset() { *m = ConsistencyCheckRequest
func (m *ConsistencyCheckRequest) String() string { return proto.CompactTextString(m) }
func (*ConsistencyCheckRequest) ProtoMessage() {}
func (*ConsistencyCheckRequest) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{6}
+ return fileDescriptor_d32bf44842ead735, []int{8}
}
func (m *ConsistencyCheckRequest) XXX_Unmarshal(b []byte) error {
@@ -553,7 +653,7 @@ func (m *ConsistencyCheckResponse) Reset() { *m = ConsistencyCheckRespon
func (m *ConsistencyCheckResponse) String() string { return proto.CompactTextString(m) }
func (*ConsistencyCheckResponse) ProtoMessage() {}
func (*ConsistencyCheckResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{7}
+ return fileDescriptor_d32bf44842ead735, []int{9}
}
func (m *ConsistencyCheckResponse) XXX_Unmarshal(b []byte) error {
@@ -610,6 +710,8 @@ func (m *ConsistencyCheckResponse) GetReferenceStorage() string {
}
func init() {
+ proto.RegisterType((*SetReplicationFactorRequest)(nil), "gitaly.SetReplicationFactorRequest")
+ proto.RegisterType((*SetReplicationFactorResponse)(nil), "gitaly.SetReplicationFactorResponse")
proto.RegisterType((*SetAuthoritativeStorageRequest)(nil), "gitaly.SetAuthoritativeStorageRequest")
proto.RegisterType((*SetAuthoritativeStorageResponse)(nil), "gitaly.SetAuthoritativeStorageResponse")
proto.RegisterType((*DatalossCheckRequest)(nil), "gitaly.DatalossCheckRequest")
@@ -626,55 +728,60 @@ func init() {
func init() { proto.RegisterFile("praefect.proto", fileDescriptor_d32bf44842ead735) }
var fileDescriptor_d32bf44842ead735 = []byte{
- // 757 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x4d, 0x6f, 0xd3, 0x4c,
- 0x10, 0x96, 0x93, 0xbc, 0xad, 0x3b, 0xfd, 0xde, 0xb7, 0x7d, 0x9b, 0xd7, 0x94, 0x7e, 0x18, 0x41,
- 0x23, 0x41, 0x93, 0x2a, 0x45, 0xe2, 0x0a, 0x6d, 0x2f, 0x45, 0x15, 0x8d, 0x5c, 0x09, 0x24, 0x38,
- 0x58, 0x6b, 0x7b, 0x9b, 0x6c, 0xd9, 0x78, 0xcd, 0xee, 0xa6, 0x92, 0x8f, 0x9c, 0x91, 0xb8, 0xf2,
- 0x03, 0xfa, 0x83, 0xf8, 0x25, 0x48, 0x9c, 0x38, 0x23, 0x7f, 0xac, 0x9b, 0x26, 0x4e, 0x0b, 0xbd,
- 0x79, 0x67, 0x9e, 0x79, 0x66, 0xe6, 0x99, 0xf1, 0x2e, 0x2c, 0x44, 0x02, 0x93, 0x73, 0xe2, 0xab,
- 0x66, 0x24, 0xb8, 0xe2, 0x68, 0xaa, 0x4b, 0x15, 0x66, 0xb1, 0x05, 0x8c, 0x86, 0xb9, 0xcd, 0x9a,
- 0x93, 0x3d, 0x2c, 0x48, 0x90, 0x9d, 0xec, 0x2b, 0x03, 0x36, 0xce, 0x88, 0x7a, 0x35, 0x50, 0x3d,
- 0x2e, 0xa8, 0xc2, 0x8a, 0x5e, 0x92, 0x33, 0xc5, 0x05, 0xee, 0x12, 0x87, 0x7c, 0x1a, 0x10, 0xa9,
- 0xd0, 0x0e, 0x2c, 0x5e, 0x52, 0xa1, 0x06, 0x98, 0xb9, 0x32, 0xf3, 0xd4, 0x8d, 0x2d, 0xa3, 0x31,
- 0xe3, 0x2c, 0xe4, 0xe6, 0x1c, 0x8f, 0x1e, 0xc1, 0xbc, 0x20, 0x2c, 0xa5, 0x70, 0x23, 0xac, 0x7a,
- 0xf5, 0x4a, 0x0a, 0x9b, 0xd3, 0xc6, 0x0e, 0x56, 0x3d, 0xb4, 0x0f, 0xab, 0x78, 0x38, 0x59, 0xc1,
- 0x59, 0x4d, 0xc1, 0x2b, 0xb8, 0xa4, 0x12, 0x7b, 0x1b, 0x36, 0x27, 0x16, 0x29, 0x23, 0x1e, 0x4a,
- 0x62, 0x7f, 0x36, 0x60, 0xe5, 0x08, 0x2b, 0xcc, 0xb8, 0x94, 0x87, 0x3d, 0xe2, 0x7f, 0xfc, 0xeb,
- 0xf2, 0x5f, 0xc2, 0x3a, 0x0d, 0x7d, 0x36, 0x08, 0x92, 0xea, 0x85, 0xa2, 0x98, 0xb1, 0xd8, 0x15,
- 0x24, 0x62, 0xd4, 0xc7, 0x8a, 0x04, 0x69, 0x37, 0xa6, 0x63, 0xe5, 0x98, 0x8e, 0x86, 0x38, 0x05,
- 0xc2, 0xfe, 0x55, 0x81, 0xd5, 0x91, 0x1a, 0xb2, 0xea, 0xd0, 0x09, 0xcc, 0x09, 0x12, 0x71, 0x49,
- 0x15, 0x17, 0x94, 0xc8, 0x7a, 0x65, 0xab, 0xda, 0x98, 0x6d, 0x37, 0x9a, 0xd9, 0x7c, 0x9a, 0xa5,
- 0x41, 0x4d, 0x47, 0x47, 0xc4, 0xce, 0x8d, 0x68, 0xeb, 0x6b, 0x05, 0xe0, 0xda, 0x39, 0xae, 0xbb,
- 0x51, 0xa2, 0xfb, 0x09, 0x98, 0x79, 0xfb, 0x3a, 0xfb, 0xde, 0x9f, 0x66, 0x6f, 0x6a, 0xad, 0x0b,
- 0x06, 0xf4, 0x00, 0x66, 0x04, 0xc1, 0x81, 0xcb, 0x43, 0x16, 0xa7, 0x93, 0x33, 0x1d, 0x33, 0x31,
- 0x9c, 0x86, 0x2c, 0x46, 0x75, 0x98, 0x8e, 0x04, 0xed, 0x63, 0x11, 0xd7, 0x6b, 0x69, 0x25, 0xfa,
- 0x68, 0xbd, 0x85, 0x69, 0xad, 0x36, 0x82, 0x5a, 0x88, 0xfb, 0x7a, 0x16, 0xe9, 0x77, 0xc2, 0xea,
- 0x91, 0x1e, 0x0d, 0x03, 0xd7, 0x8b, 0x53, 0xb9, 0xab, 0x8e, 0x99, 0x19, 0x0e, 0x62, 0x64, 0x81,
- 0x89, 0xa5, 0xa4, 0xdd, 0x90, 0x04, 0x3a, 0xa3, 0x3e, 0xdb, 0xa7, 0xf0, 0xff, 0x90, 0x58, 0xd9,
- 0x40, 0xa4, 0x5e, 0x80, 0x36, 0x40, 0xa1, 0x5e, 0x9c, 0xe6, 0x9b, 0x6d, 0x23, 0xdd, 0xfb, 0x50,
- 0xd8, 0x10, 0xca, 0xbe, 0xaa, 0x80, 0x55, 0xc6, 0x98, 0x8f, 0xf3, 0xcd, 0x75, 0x87, 0x19, 0xdf,
- 0xf3, 0x12, 0xbe, 0x91, 0xa0, 0x21, 0xd7, 0x11, 0x51, 0x98, 0x32, 0x59, 0xe8, 0x82, 0x3a, 0x60,
- 0xe6, 0x8b, 0xa6, 0x87, 0x73, 0x3f, 0xc2, 0x82, 0xc5, 0xf2, 0x61, 0x79, 0xcc, 0x7d, 0x1f, 0x25,
- 0x12, 0xd9, 0xfd, 0x64, 0x2f, 0xe4, 0xa0, 0x9f, 0xff, 0xcf, 0xc5, 0xd9, 0xfe, 0x6e, 0xc0, 0xda,
- 0x21, 0x0f, 0x25, 0x95, 0x8a, 0x84, 0x7e, 0x7c, 0xbf, 0xdf, 0xee, 0x31, 0x2c, 0x28, 0x2c, 0xba,
- 0x44, 0x15, 0xb8, 0x2c, 0xcd, 0x7c, 0x66, 0xd5, 0xb0, 0xa7, 0xb0, 0x2c, 0xc8, 0x39, 0x11, 0x24,
- 0xf4, 0x47, 0xef, 0x8c, 0xa5, 0xc2, 0xa1, 0xc1, 0x2f, 0x60, 0x2d, 0xa0, 0x12, 0x7b, 0x8c, 0xb8,
- 0x82, 0xf8, 0x3c, 0xf4, 0x29, 0x63, 0x14, 0x2b, 0xca, 0xc3, 0x74, 0x23, 0x4d, 0xe7, 0xbf, 0xdc,
- 0xed, 0xdc, 0xf4, 0xda, 0x3f, 0x0c, 0xa8, 0x8f, 0x77, 0x94, 0x4f, 0xfd, 0x19, 0xa0, 0x44, 0x18,
- 0xb7, 0xec, 0x67, 0x5b, 0x4a, 0x3c, 0xce, 0xf0, 0x0f, 0xb7, 0x03, 0x8b, 0x79, 0x5f, 0x23, 0xfa,
- 0xe5, 0xed, 0x1e, 0xe6, 0x56, 0xb4, 0x9b, 0xd0, 0xea, 0xce, 0x0a, 0x6c, 0xd6, 0xda, 0x75, 0xcf,
- 0x05, 0x7c, 0x03, 0x66, 0x93, 0x29, 0xbb, 0x17, 0xdc, 0x73, 0x69, 0x90, 0xf6, 0x53, 0x73, 0x66,
- 0x12, 0xd3, 0x6b, 0xee, 0x1d, 0x07, 0xe5, 0x42, 0xfd, 0x53, 0x2e, 0x54, 0xfb, 0x4b, 0x15, 0xfe,
- 0xed, 0xe4, 0x6f, 0xc6, 0x71, 0x78, 0xce, 0xcf, 0x88, 0xb8, 0xa4, 0x3e, 0x41, 0x1f, 0x00, 0x8d,
- 0x2f, 0x1e, 0xda, 0xbe, 0x6d, 0x29, 0xd3, 0xb1, 0x5b, 0xf6, 0xdd, 0x7b, 0x8b, 0xde, 0xc1, 0xd2,
- 0xa8, 0xc6, 0x68, 0x53, 0xc7, 0x4d, 0xd8, 0x27, 0x6b, 0x6b, 0x32, 0x20, 0xa3, 0xdd, 0x33, 0xd0,
- 0x09, 0xcc, 0xdf, 0xb8, 0xcb, 0xd0, 0xfa, 0x84, 0x2b, 0x2e, 0xa3, 0x7c, 0x78, 0xeb, 0x05, 0x88,
- 0x2e, 0x60, 0x6d, 0xc2, 0xa3, 0x83, 0x9e, 0xe8, 0xc8, 0xdb, 0x9f, 0x4e, 0x6b, 0xe7, 0x4e, 0x5c,
- 0x96, 0xcb, 0xaa, 0xfd, 0xfc, 0xd6, 0x30, 0x0e, 0xf6, 0xde, 0x27, 0x78, 0x86, 0xbd, 0xa6, 0xcf,
- 0xfb, 0xad, 0xec, 0x73, 0x97, 0x8b, 0x6e, 0x2b, 0x63, 0x69, 0xa5, 0x4f, 0x76, 0xab, 0xcb, 0xf3,
- 0x73, 0xe4, 0x79, 0x53, 0xa9, 0x69, 0xff, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0xda, 0xfe, 0xa3,
- 0x74, 0xf9, 0x07, 0x00, 0x00,
+ // 834 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x4f, 0x6f, 0xdc, 0x44,
+ 0x14, 0x97, 0x77, 0xd3, 0x76, 0xf3, 0xf2, 0xa7, 0xc9, 0x90, 0x12, 0xe3, 0x86, 0x76, 0xeb, 0x02,
+ 0x59, 0x09, 0xba, 0x89, 0x52, 0x24, 0x24, 0x4e, 0xd0, 0x54, 0x48, 0x45, 0x11, 0x8d, 0x1c, 0x09,
+ 0x24, 0x38, 0x58, 0x63, 0x7b, 0xb2, 0x3b, 0x65, 0xd6, 0x63, 0x66, 0x66, 0x23, 0xf9, 0xc8, 0x17,
+ 0xe0, 0x8a, 0xc4, 0xb5, 0x1f, 0x88, 0x4f, 0x02, 0xe2, 0xc4, 0x19, 0x79, 0xfe, 0x78, 0x9d, 0x5d,
+ 0xef, 0x06, 0x22, 0x71, 0xf3, 0xbc, 0xf7, 0x7b, 0xff, 0x7e, 0xef, 0xcd, 0x1b, 0xc3, 0x76, 0x21,
+ 0x30, 0xb9, 0x24, 0xa9, 0x1a, 0x16, 0x82, 0x2b, 0x8e, 0xee, 0x8e, 0xa8, 0xc2, 0xac, 0x0c, 0x80,
+ 0xd1, 0xdc, 0xca, 0x82, 0x4d, 0x39, 0xc6, 0x82, 0x64, 0xe6, 0x14, 0xfe, 0xe6, 0xc1, 0xc3, 0x0b,
+ 0xa2, 0x22, 0x52, 0x30, 0x9a, 0x62, 0x45, 0x79, 0xfe, 0x15, 0x4e, 0x15, 0x17, 0x11, 0xf9, 0x69,
+ 0x4a, 0xa4, 0x42, 0x87, 0x70, 0xff, 0x8a, 0x0a, 0x35, 0xc5, 0x2c, 0x96, 0x8a, 0x0b, 0x3c, 0x22,
+ 0xbe, 0xd7, 0xf7, 0x06, 0xeb, 0xd1, 0xb6, 0x15, 0x5f, 0x18, 0x29, 0x7a, 0x0a, 0x5b, 0x82, 0x30,
+ 0xac, 0xe8, 0x15, 0x89, 0x0b, 0xac, 0xc6, 0x7e, 0x47, 0xc3, 0x36, 0x9d, 0xf0, 0x1c, 0xab, 0x31,
+ 0x7a, 0x06, 0x48, 0xcc, 0x22, 0xc5, 0x97, 0x3a, 0x94, 0xdf, 0xed, 0x7b, 0x83, 0x3b, 0xd1, 0xae,
+ 0x98, 0xcf, 0x21, 0xfc, 0x1c, 0x0e, 0xda, 0x73, 0x93, 0x05, 0xcf, 0x25, 0x41, 0x01, 0xf4, 0x6c,
+ 0x52, 0xd2, 0xf7, 0xfa, 0xdd, 0xc1, 0x7a, 0x54, 0x9f, 0xc3, 0xb7, 0x1e, 0x3c, 0xba, 0x20, 0xea,
+ 0xcb, 0xa9, 0x1a, 0x73, 0x41, 0x95, 0xce, 0xc1, 0xe6, 0xfa, 0xff, 0xd4, 0xf6, 0x1c, 0x1e, 0xe0,
+ 0x66, 0xb0, 0xda, 0x67, 0x57, 0x83, 0xf7, 0x70, 0x4b, 0x26, 0xe1, 0x13, 0x78, 0xbc, 0x34, 0x49,
+ 0x53, 0x64, 0xf8, 0xb3, 0x07, 0x7b, 0x2f, 0xb1, 0xc2, 0x8c, 0x4b, 0x79, 0x3a, 0x26, 0xe9, 0x8f,
+ 0xff, 0x39, 0xfd, 0x2f, 0xe0, 0x80, 0xe6, 0x29, 0x9b, 0x66, 0x55, 0xf6, 0x42, 0x51, 0xcc, 0x58,
+ 0x19, 0x3b, 0xb6, 0x49, 0xa6, 0xab, 0xe9, 0x45, 0x81, 0xc5, 0x9c, 0x3b, 0x48, 0x54, 0x23, 0xc2,
+ 0xbf, 0x3b, 0xf0, 0x60, 0x2e, 0x07, 0xdb, 0x82, 0x33, 0xd8, 0x14, 0xa4, 0xe0, 0x92, 0x2a, 0x2e,
+ 0x28, 0x91, 0x7e, 0xa7, 0xdf, 0x1d, 0x6c, 0x9c, 0x0c, 0x86, 0x66, 0xf0, 0x86, 0xad, 0x46, 0xc3,
+ 0xc8, 0x59, 0x94, 0xd1, 0x35, 0xeb, 0xe0, 0x97, 0x0e, 0xc0, 0x4c, 0xb9, 0xc8, 0xbb, 0xd7, 0xc2,
+ 0xfb, 0x59, 0x63, 0x08, 0x4c, 0xf4, 0xe3, 0x7f, 0x1b, 0x7d, 0xe8, 0xb8, 0xae, 0x3d, 0xa0, 0x87,
+ 0xb0, 0x2e, 0x08, 0xce, 0x62, 0x9e, 0xb3, 0x52, 0x77, 0xae, 0x17, 0xf5, 0x2a, 0xc1, 0xeb, 0x9c,
+ 0x95, 0xc8, 0x87, 0x7b, 0x85, 0xa0, 0x13, 0x2c, 0x4a, 0x7f, 0x4d, 0x67, 0xe2, 0x8e, 0xc1, 0xb7,
+ 0x70, 0xcf, 0xb1, 0x8d, 0x60, 0x2d, 0xc7, 0x13, 0xd7, 0x0b, 0xfd, 0x5d, 0x79, 0x4d, 0xc8, 0x98,
+ 0xe6, 0x59, 0x9c, 0x94, 0x9a, 0xee, 0x6e, 0xd4, 0x33, 0x82, 0x17, 0x65, 0x35, 0xc5, 0x58, 0x4a,
+ 0x3a, 0xca, 0x49, 0xe6, 0x22, 0xba, 0x73, 0xf8, 0x1a, 0xde, 0x6b, 0x90, 0x65, 0x1a, 0x22, 0xdd,
+ 0x00, 0x9c, 0x00, 0xd4, 0xec, 0x95, 0x3a, 0xde, 0xc6, 0x09, 0x72, 0xb5, 0x37, 0xcc, 0x1a, 0xa8,
+ 0xf0, 0x6d, 0x07, 0x82, 0x36, 0x8f, 0xb6, 0x9d, 0xdf, 0xcc, 0x2a, 0x34, 0xfe, 0x3e, 0x6d, 0xf1,
+ 0x37, 0x67, 0xd4, 0x50, 0xbd, 0x24, 0x0a, 0x53, 0x26, 0x6b, 0x5e, 0xd0, 0x39, 0xf4, 0xec, 0xa0,
+ 0xb9, 0xe6, 0xdc, 0xce, 0x61, 0xed, 0x25, 0x48, 0x61, 0x77, 0x41, 0x7d, 0x1b, 0x26, 0x2a, 0xda,
+ 0xd3, 0x6a, 0x2e, 0xe4, 0x74, 0x62, 0xef, 0x73, 0x7d, 0x0e, 0x7f, 0xf7, 0x60, 0xff, 0x94, 0xe7,
+ 0x92, 0x4a, 0x45, 0xf2, 0xb4, 0xbc, 0xdd, 0xb5, 0xfb, 0x10, 0xb6, 0x15, 0x16, 0x23, 0xa2, 0x6a,
+ 0x9c, 0x09, 0xb3, 0x65, 0xa4, 0x0e, 0xf6, 0x31, 0xec, 0x0a, 0x72, 0x49, 0x04, 0xc9, 0xd3, 0xf9,
+ 0x9d, 0xb1, 0x53, 0x2b, 0x1c, 0xf8, 0x33, 0xd8, 0xcf, 0xa8, 0xc4, 0x09, 0x23, 0xb1, 0x20, 0x29,
+ 0xcf, 0x53, 0xca, 0x18, 0xd5, 0xab, 0x51, 0x4f, 0x64, 0x2f, 0x7a, 0xd7, 0xaa, 0xa3, 0xeb, 0xda,
+ 0xf0, 0x0f, 0x0f, 0xfc, 0xc5, 0x8a, 0x6c, 0xd7, 0x3f, 0xd1, 0x6b, 0x99, 0xc7, 0x6d, 0x97, 0x6d,
+ 0xa7, 0xd2, 0x44, 0xcd, 0x0b, 0x77, 0x08, 0xf7, 0x6d, 0x5d, 0x73, 0xfc, 0xd9, 0x72, 0x4f, 0xad,
+ 0xd4, 0x6c, 0x7b, 0x57, 0x59, 0x8d, 0x35, 0xa5, 0xcd, 0x6a, 0xae, 0xe1, 0x8f, 0x60, 0xa3, 0xea,
+ 0x72, 0xfc, 0x86, 0x27, 0x31, 0xcd, 0x74, 0x3d, 0x6b, 0xd1, 0x7a, 0x25, 0xfa, 0x9a, 0x27, 0xaf,
+ 0xb2, 0x76, 0xa2, 0xee, 0xb4, 0x13, 0x75, 0xf2, 0x67, 0x17, 0xde, 0x39, 0xb7, 0x8f, 0xe1, 0xab,
+ 0xfc, 0x92, 0x5f, 0x10, 0x71, 0x45, 0x53, 0x82, 0x7e, 0x00, 0xb4, 0x38, 0x78, 0xe8, 0xc9, 0xaa,
+ 0xa1, 0xd4, 0x6d, 0x0f, 0xc2, 0x9b, 0xe7, 0x16, 0x7d, 0x07, 0x3b, 0xf3, 0x1c, 0xa3, 0xc7, 0xce,
+ 0x6e, 0xc9, 0x3c, 0x05, 0xfd, 0xe5, 0x00, 0xe3, 0xf6, 0xd8, 0x43, 0x67, 0xb0, 0x75, 0x6d, 0x97,
+ 0xa1, 0x83, 0x25, 0x2b, 0xce, 0xb8, 0x7c, 0x7f, 0xe5, 0x02, 0x44, 0x6f, 0x60, 0x7f, 0xc9, 0xa3,
+ 0x83, 0x3e, 0x72, 0x96, 0xab, 0x9f, 0xce, 0xe0, 0xf0, 0x46, 0x9c, 0x8d, 0x85, 0x61, 0xaf, 0xed,
+ 0x09, 0x47, 0x4f, 0x1b, 0x0e, 0x96, 0xfd, 0x7c, 0x04, 0x1f, 0xac, 0x06, 0x99, 0x10, 0xc1, 0xda,
+ 0x5f, 0xbf, 0x0e, 0xbc, 0x17, 0xc7, 0xdf, 0x57, 0x60, 0x86, 0x93, 0x61, 0xca, 0x27, 0x47, 0xe6,
+ 0xf3, 0x19, 0x17, 0xa3, 0x23, 0xe3, 0xe2, 0x48, 0xff, 0xee, 0x1c, 0x8d, 0xb8, 0x3d, 0x17, 0x49,
+ 0x72, 0x57, 0x8b, 0x9e, 0xff, 0x13, 0x00, 0x00, 0xff, 0xff, 0x99, 0x51, 0xfc, 0x0b, 0x35, 0x09,
+ 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -701,6 +808,15 @@ type PraefectInfoServiceClient interface {
// This causes the current version of the repository on the authoritative storage to be considered the
// latest and overwrite any other version on the virtual storage.
SetAuthoritativeStorage(ctx context.Context, in *SetAuthoritativeStorageRequest, opts ...grpc.CallOption) (*SetAuthoritativeStorageResponse, error)
+ // SetReplicationFactor assigns or unassigns host nodes from the repository to meet the desired replication factor.
+ // SetReplicationFactor returns an error when trying to set a replication factor that exceeds the storage node count
+ // in the virtual storage. An error is also returned when trying to set a replication factor below one. The primary node
+ // won't be unassigned as it needs a copy of the repository to accept writes. Likewise, the primary is the first storage
+ // that gets assigned when setting a replication factor for a repository. Assignments of unconfigured storages are ignored.
+ // This might cause the actual replication factor to be higher than desired if the replication factor is set during an upgrade
+ // from a Praefect node that does not yet know about a new node. As assignments of unconfigured storages are ignored, replication
+ // factor of repositories assigned to a storage node removed from the cluster is effectively decreased.
+ SetReplicationFactor(ctx context.Context, in *SetReplicationFactorRequest, opts ...grpc.CallOption) (*SetReplicationFactorResponse, error)
}
type praefectInfoServiceClient struct {
@@ -770,6 +886,15 @@ func (c *praefectInfoServiceClient) SetAuthoritativeStorage(ctx context.Context,
return out, nil
}
+func (c *praefectInfoServiceClient) SetReplicationFactor(ctx context.Context, in *SetReplicationFactorRequest, opts ...grpc.CallOption) (*SetReplicationFactorResponse, error) {
+ out := new(SetReplicationFactorResponse)
+ err := c.cc.Invoke(ctx, "/gitaly.PraefectInfoService/SetReplicationFactor", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
// PraefectInfoServiceServer is the server API for PraefectInfoService service.
type PraefectInfoServiceServer interface {
RepositoryReplicas(context.Context, *RepositoryReplicasRequest) (*RepositoryReplicasResponse, error)
@@ -784,6 +909,15 @@ type PraefectInfoServiceServer interface {
// This causes the current version of the repository on the authoritative storage to be considered the
// latest and overwrite any other version on the virtual storage.
SetAuthoritativeStorage(context.Context, *SetAuthoritativeStorageRequest) (*SetAuthoritativeStorageResponse, error)
+ // SetReplicationFactor assigns or unassigns host nodes from the repository to meet the desired replication factor.
+ // SetReplicationFactor returns an error when trying to set a replication factor that exceeds the storage node count
+ // in the virtual storage. An error is also returned when trying to set a replication factor below one. The primary node
+ // won't be unassigned as it needs a copy of the repository to accept writes. Likewise, the primary is the first storage
+ // that gets assigned when setting a replication factor for a repository. Assignments of unconfigured storages are ignored.
+ // This might cause the actual replication factor to be higher than desired if the replication factor is set during an upgrade
+ // from a Praefect node that does not yet know about a new node. As assignments of unconfigured storages are ignored, replication
+ // factor of repositories assigned to a storage node removed from the cluster is effectively decreased.
+ SetReplicationFactor(context.Context, *SetReplicationFactorRequest) (*SetReplicationFactorResponse, error)
}
// UnimplementedPraefectInfoServiceServer can be embedded to have forward compatible implementations.
@@ -802,6 +936,9 @@ func (*UnimplementedPraefectInfoServiceServer) DatalossCheck(ctx context.Context
func (*UnimplementedPraefectInfoServiceServer) SetAuthoritativeStorage(ctx context.Context, req *SetAuthoritativeStorageRequest) (*SetAuthoritativeStorageResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SetAuthoritativeStorage not implemented")
}
+func (*UnimplementedPraefectInfoServiceServer) SetReplicationFactor(ctx context.Context, req *SetReplicationFactorRequest) (*SetReplicationFactorResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method SetReplicationFactor not implemented")
+}
func RegisterPraefectInfoServiceServer(s *grpc.Server, srv PraefectInfoServiceServer) {
s.RegisterService(&_PraefectInfoService_serviceDesc, srv)
@@ -882,6 +1019,24 @@ func _PraefectInfoService_SetAuthoritativeStorage_Handler(srv interface{}, ctx c
return interceptor(ctx, in, info, handler)
}
+func _PraefectInfoService_SetReplicationFactor_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(SetReplicationFactorRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(PraefectInfoServiceServer).SetReplicationFactor(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/gitaly.PraefectInfoService/SetReplicationFactor",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(PraefectInfoServiceServer).SetReplicationFactor(ctx, req.(*SetReplicationFactorRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
var _PraefectInfoService_serviceDesc = grpc.ServiceDesc{
ServiceName: "gitaly.PraefectInfoService",
HandlerType: (*PraefectInfoServiceServer)(nil),
@@ -898,6 +1053,10 @@ var _PraefectInfoService_serviceDesc = grpc.ServiceDesc{
MethodName: "SetAuthoritativeStorage",
Handler: _PraefectInfoService_SetAuthoritativeStorage_Handler,
},
+ {
+ MethodName: "SetReplicationFactor",
+ Handler: _PraefectInfoService_SetReplicationFactor_Handler,
+ },
},
Streams: []grpc.StreamDesc{
{
diff --git a/proto/praefect.proto b/proto/praefect.proto
index 5d2c05970..e9676d20c 100644
--- a/proto/praefect.proto
+++ b/proto/praefect.proto
@@ -24,6 +24,32 @@ service PraefectInfoService {
// This causes the current version of the repository on the authoritative storage to be considered the
// latest and overwrite any other version on the virtual storage.
rpc SetAuthoritativeStorage(SetAuthoritativeStorageRequest) returns (SetAuthoritativeStorageResponse);
+
+ // SetReplicationFactor assigns or unassigns host nodes from the repository to meet the desired replication factor.
+ // SetReplicationFactor returns an error when trying to set a replication factor that exceeds the storage node count
+ // in the virtual storage. An error is also returned when trying to set a replication factor below one. The primary node
+ // won't be unassigned as it needs a copy of the repository to accept writes. Likewise, the primary is the first storage
+ // that gets assigned when setting a replication factor for a repository. Assignments of unconfigured storages are ignored.
+ // This might cause the actual replication factor to be higher than desired if the replication factor is set during an upgrade
+ // from a Praefect node that does not yet know about a new node. As assignments of unconfigured storages are ignored, replication
+ // factor of repositories assigned to a storage node removed from the cluster is effectively decreased.
+ rpc SetReplicationFactor(SetReplicationFactorRequest) returns (SetReplicationFactorResponse);
+}
+
+// SetReplicationFactorRequest sets the desired replication factor for a repository.
+message SetReplicationFactorRequest {
+ // virtual_storage is the virtual storage the repository is located in
+ string virtual_storage = 1;
+ // relative_path is the relative path of the repository
+ string relative_path = 2;
+ // replication_factor is the desired replication factor. Replication must be equal or greater than 1.
+ int32 replication_factor = 3;
+}
+
+// SetReplicationFactorResponse returns the assigned hosts after setting the desired replication factor.
+message SetReplicationFactorResponse {
+ // storages are the storages assigned to host the repository.
+ repeated string storages = 1;
}
message SetAuthoritativeStorageRequest {
diff --git a/ruby/proto/gitaly/praefect_pb.rb b/ruby/proto/gitaly/praefect_pb.rb
index 73f17f34a..b5f5da567 100644
--- a/ruby/proto/gitaly/praefect_pb.rb
+++ b/ruby/proto/gitaly/praefect_pb.rb
@@ -7,6 +7,14 @@ require 'lint_pb'
require 'shared_pb'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_file("praefect.proto", :syntax => :proto3) do
+ add_message "gitaly.SetReplicationFactorRequest" do
+ optional :virtual_storage, :string, 1
+ optional :relative_path, :string, 2
+ optional :replication_factor, :int32, 3
+ end
+ add_message "gitaly.SetReplicationFactorResponse" do
+ repeated :storages, :string, 1
+ end
add_message "gitaly.SetAuthoritativeStorageRequest" do
optional :virtual_storage, :string, 1
optional :relative_path, :string, 2
@@ -60,6 +68,8 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
end
module Gitaly
+ SetReplicationFactorRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SetReplicationFactorRequest").msgclass
+ SetReplicationFactorResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SetReplicationFactorResponse").msgclass
SetAuthoritativeStorageRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SetAuthoritativeStorageRequest").msgclass
SetAuthoritativeStorageResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SetAuthoritativeStorageResponse").msgclass
DatalossCheckRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckRequest").msgclass
diff --git a/ruby/proto/gitaly/praefect_services_pb.rb b/ruby/proto/gitaly/praefect_services_pb.rb
index 5d1e4ae70..9eb597889 100644
--- a/ruby/proto/gitaly/praefect_services_pb.rb
+++ b/ruby/proto/gitaly/praefect_services_pb.rb
@@ -26,6 +26,15 @@ module Gitaly
# This causes the current version of the repository on the authoritative storage to be considered the
# latest and overwrite any other version on the virtual storage.
rpc :SetAuthoritativeStorage, Gitaly::SetAuthoritativeStorageRequest, Gitaly::SetAuthoritativeStorageResponse
+ # SetReplicationFactor assigns or unassigns host nodes from the repository to meet the desired replication factor.
+ # SetReplicationFactor returns an error when trying to set a replication factor that exceeds the storage node count
+ # in the virtual storage. An error is also returned when trying to set a replication factor below one. The primary node
+ # won't be unassigned as it needs a copy of the repository to accept writes. Likewise, the primary is the first storage
+ # that gets assigned when setting a replication factor for a repository. Assignments of unconfigured storages are ignored.
+ # This might cause the actual replication factor to be higher than desired if the replication factor is set during an upgrade
+ # from a Praefect node that does not yet know about a new node. As assignments of unconfigured storages are ignored, replication
+ # factor of repositories assigned to a storage node removed from the cluster is effectively decreased.
+ rpc :SetReplicationFactor, Gitaly::SetReplicationFactorRequest, Gitaly::SetReplicationFactorResponse
end
Stub = Service.rpc_stub_class