diff options
author | Toon Claes <toon@gitlab.com> | 2020-12-04 17:08:47 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2020-12-04 17:08:47 +0300 |
commit | a6a6ca5098a45507c17d3828fbd309569c19ffdf (patch) | |
tree | 58073dc7a124d286e47410bd34c79271837aac72 | |
parent | 2eb4db13dab06b87382582f5fcddab0c8397463e (diff) | |
parent | c298d3efb7a0a270b0cfc73f51baca6312eb50fe (diff) |
Merge branch 'smh-set-replication-factor' into 'master'
Set replication factor for a repository
See merge request gitlab-org/gitaly!2851
22 files changed, 766 insertions, 84 deletions
diff --git a/changelogs/unreleased/smh-set-replication-factor.yml b/changelogs/unreleased/smh-set-replication-factor.yml new file mode 100644 index 000000000..990b09392 --- /dev/null +++ b/changelogs/unreleased/smh-set-replication-factor.yml @@ -0,0 +1,5 @@ +--- +title: Set replication factor for a repository +merge_request: 2851 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 9a0ad7485..73dc1ca62 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -108,6 +108,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes/tracker" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/praefect/reconciler" + "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/labkit/monitoring" @@ -238,6 +239,7 @@ func run(cfgs []starter.Config, conf config.Config) error { var rs datastore.RepositoryStore var sp nodes.StorageProvider var metricsCollectors []prometheus.Collector + var replicationFactorSetter info.ReplicationFactorSetter if conf.MemoryQueueEnabled { queue = datastore.NewMemoryReplicationEventQueue(conf) @@ -326,13 +328,16 @@ func run(cfgs []starter.Config, conf config.Config) error { } }() + assignmentStore := datastore.NewAssignmentStore(db, conf.StorageNames()) + replicationFactorSetter = assignmentStore + router = praefect.NewPerRepositoryRouter( nodeSet.Connections(), elector, hm, praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), rs, - datastore.NewAssignmentStore(db, conf.StorageNames()), + assignmentStore, ) } else { healthChecker = praefect.HealthChecker(nodeManager) @@ -377,6 +382,7 @@ func run(cfgs []starter.Config, conf config.Config) error { transactionManager, queue, rs, + replicationFactorSetter, protoregistry.GitalyProtoPreregistered, ) ) diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index 7b867dc54..9fbff7675 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -24,14 +24,15 @@ type subcmd interface { var ( subcommands = map[string]subcmd{ - "sql-ping": &sqlPingSubcommand{}, - "sql-migrate": &sqlMigrateSubcommand{}, - "dial-nodes": &dialNodesSubcommand{}, - "reconcile": &reconcileSubcommand{}, - "sql-migrate-down": &sqlMigrateDownSubcommand{}, - "sql-migrate-status": &sqlMigrateStatusSubcommand{}, - "dataloss": newDatalossSubcommand(), - "accept-dataloss": &acceptDatalossSubcommand{}, + "sql-ping": &sqlPingSubcommand{}, + "sql-migrate": &sqlMigrateSubcommand{}, + "dial-nodes": &dialNodesSubcommand{}, + "reconcile": &reconcileSubcommand{}, + "sql-migrate-down": &sqlMigrateDownSubcommand{}, + "sql-migrate-status": &sqlMigrateStatusSubcommand{}, + "dataloss": newDatalossSubcommand(), + "accept-dataloss": &acceptDatalossSubcommand{}, + "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout), } ) diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go index 9d2c7a303..7927df9dc 100644 --- a/cmd/praefect/subcmd_accept_dataloss_test.go +++ b/cmd/praefect/subcmd_accept_dataloss_test.go @@ -61,7 +61,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) { }, } - ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(nil, conf, q, rs))}) + ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(nil, conf, q, rs, nil))}) defer clean() conf.SocketPath = ln.Addr().String() diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go index fa5a0533f..9a0b8ddbc 100644 --- a/cmd/praefect/subcmd_dataloss_test.go +++ b/cmd/praefect/subcmd_dataloss_test.go @@ -79,7 +79,7 @@ func TestDatalossSubcommand(t *testing.T) { require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0)) ln, clean := listenAndServe(t, []svcRegistrar{ - registerPraefectInfoServer(info.NewServer(mgr, cfg, nil, gs))}) + registerPraefectInfoServer(info.NewServer(mgr, cfg, nil, gs, nil))}) defer clean() for _, tc := range []struct { desc string diff --git a/cmd/praefect/subcmd_set_replication_factor.go b/cmd/praefect/subcmd_set_replication_factor.go new file mode 100644 index 000000000..b179b0153 --- /dev/null +++ b/cmd/praefect/subcmd_set_replication_factor.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "flag" + "fmt" + "io" + "strings" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +const paramReplicationFactor = "replication-factor" + +type setReplicationFactorSubcommand struct { + stdout io.Writer + virtualStorage string + relativePath string + replicationFactor int +} + +func newSetReplicatioFactorSubcommand(stdout io.Writer) *setReplicationFactorSubcommand { + return &setReplicationFactorSubcommand{stdout: stdout} +} + +func (cmd *setReplicationFactorSubcommand) FlagSet() *flag.FlagSet { + fs := flag.NewFlagSet("set-replication-factor", flag.ContinueOnError) + fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage") + fs.StringVar(&cmd.relativePath, paramRelativePath, "", "repository to set the replication factor for") + fs.IntVar(&cmd.replicationFactor, paramReplicationFactor, -1, "desired replication factor") + return fs +} + +func (cmd *setReplicationFactorSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) error { + if flags.NArg() > 0 { + return unexpectedPositionalArgsError{Command: flags.Name()} + } else if cmd.virtualStorage == "" { + return requiredParameterError(paramVirtualStorage) + } else if cmd.relativePath == "" { + return requiredParameterError(paramRelativePath) + } else if cmd.replicationFactor < 0 { + return requiredParameterError(paramReplicationFactor) + } + + nodeAddr, err := getNodeAddress(cfg) + if err != nil { + return err + } + + conn, err := subCmdDial(nodeAddr, cfg.Auth.Token) + if err != nil { + return fmt.Errorf("error dialing: %w", err) + } + defer conn.Close() + + client := gitalypb.NewPraefectInfoServiceClient(conn) + resp, err := client.SetReplicationFactor(context.TODO(), &gitalypb.SetReplicationFactorRequest{ + VirtualStorage: cmd.virtualStorage, + RelativePath: cmd.relativePath, + ReplicationFactor: int32(cmd.replicationFactor), + }) + if err != nil { + return err + } + + fmt.Fprintf(cmd.stdout, "current assignments: %v", strings.Join(resp.Storages, ", ")) + + return nil +} diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go new file mode 100644 index 000000000..b56d566ed --- /dev/null +++ b/cmd/praefect/subcmd_set_replication_factor_test.go @@ -0,0 +1,100 @@ +// +build postgres + +package main + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestSetReplicationFactorSubcommand(t *testing.T) { + for _, tc := range []struct { + desc string + args []string + error error + stdout string + }{ + { + desc: "unexpected positional arguments", + args: []string{"positonal-arg"}, + error: unexpectedPositionalArgsError{Command: "set-replication-factor"}, + }, + { + desc: "missing virtual-storage", + args: []string{}, + error: requiredParameterError("virtual-storage"), + }, + { + desc: "missing repository", + args: []string{"-virtual-storage=virtual-storage"}, + error: requiredParameterError("repository"), + }, + { + desc: "missing replication-factor", + args: []string{"-virtual-storage=virtual-storage", "-repository=relative-path"}, + error: requiredParameterError("replication-factor"), + }, + { + desc: "replication factor too small", + args: []string{"-virtual-storage=virtual-storage", "-repository=relative-path", "-replication-factor=0"}, + error: status.Error(codes.Unknown, "set replication factor: attempted to set replication factor 0 but minimum is 1"), + }, + { + desc: "replication factor too big", + args: []string{"-virtual-storage=virtual-storage", "-repository=relative-path", "-replication-factor=3"}, + error: status.Error(codes.Unknown, "set replication factor: attempted to set replication factor 3 but virtual storage only contains 2 storages"), + }, + { + desc: "virtual storage not found", + args: []string{"-virtual-storage=non-existent", "-repository=relative-path", "-replication-factor=2"}, + error: status.Error(codes.Unknown, `set replication factor: unknown virtual storage: "non-existent"`), + }, + { + desc: "repository not found", + args: []string{"-virtual-storage=virtual-storage", "-repository=non-existent", "-replication-factor=2"}, + error: status.Error(codes.Unknown, `set replication factor: repository "virtual-storage"/"non-existent" not found`), + }, + { + desc: "successfully set", + args: []string{"-virtual-storage=virtual-storage", "-repository=relative-path", "-replication-factor=2"}, + stdout: "current assignments: primary, secondary", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + db := getDB(t) + + store := datastore.NewAssignmentStore(db, map[string][]string{"virtual-storage": {"primary", "secondary"}}) + + // create a repository record + require.NoError(t, + datastore.NewPostgresRepositoryStore(db, nil).SetGeneration(ctx, "virtual-storage", "relative-path", "primary", 0), + ) + + ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer( + info.NewServer(nil, config.Config{}, nil, nil, store), + )}) + defer clean() + + stdout := &bytes.Buffer{} + cmd := &setReplicationFactorSubcommand{stdout: stdout} + fs := cmd.FlagSet() + require.NoError(t, fs.Parse(tc.args)) + err := cmd.Exec(fs, config.Config{ + SocketPath: ln.Addr().String(), + }) + require.Equal(t, tc.error, err) + require.Equal(t, tc.stdout, stdout.String()) + }) + } +} diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 0fceb176d..899b67758 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -178,7 +178,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry) - srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil) + srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() diff --git a/internal/praefect/datastore/assignment.go b/internal/praefect/datastore/assignment.go index 6bab0a594..77904c50c 100644 --- a/internal/praefect/datastore/assignment.go +++ b/internal/praefect/datastore/assignment.go @@ -12,6 +12,18 @@ func newVirtualStorageNotFoundError(virtualStorage string) error { return fmt.Errorf("virtual storage %q not found", virtualStorage) } +func newUnattainableReplicationFactorError(attempted, maximum int) error { + return fmt.Errorf("attempted to set replication factor %d but virtual storage only contains %d storages", attempted, maximum) +} + +func newMinimumReplicationFactorError(replicationFactor int) error { + return fmt.Errorf("attempted to set replication factor %d but minimum is 1", replicationFactor) +} + +func newRepositoryNotFoundError(virtualStorage, relativePath string) error { + return fmt.Errorf("repository %q/%q not found", virtualStorage, relativePath) +} + // AssignmentStore manages host assignments in Postgres. type AssignmentStore struct { db glsql.Querier @@ -61,3 +73,120 @@ AND storage = ANY($3) return assignedStorages, nil } + +// SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met. +// Please see the protobuf documentation of the method for details. +func (s AssignmentStore) SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error) { + candidateStorages, ok := s.configuredStorages[virtualStorage] + if !ok { + return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage) + } + + if replicationFactor < 1 { + return nil, newMinimumReplicationFactorError(replicationFactor) + } + + if max := len(candidateStorages); replicationFactor > max { + return nil, newUnattainableReplicationFactorError(replicationFactor, max) + } + + // The query works as follows: + // + // 1. `repository` CTE locks the repository's record for the duration of the update. + // This prevents concurrent updates to the `repository_assignments` table for the given + // repository. It is not sufficient to rely on row locks in `repository_assignments` + // as there might be rows being inserted or deleted in another transaction that + // our transaction does not lock. This could be the case if the replication factor + // is being increased concurrently from two different nodes and they assign different + // storages. + // + // 2. `existing_assignments` CTE gets the existing assignments for the repository. While + // there may be assignments in the database for storage nodes that were removed from the + // cluster, the query filters them out. + // + // 3. `created_assignments` CTE assigns new hosts to the repository if the replication + // factor has been increased. Random storages which are not yet assigned to the repository + // are picked until the replication factor is met. The primary of a repository is always + // assigned first. + // + // 4. `removed_assignments` CTE removes host assignments if the replication factor has been + // decreased. Primary is never removed as it needs a copy of the repository in order to + // accept writes. Random hosts are removed until the replication factor is met. + // + // 6. Finally we return the current set of assignments. CTE updates are not visible in the + // tables during the transaction. To account for that, we filter out removed assignments + // from the existing assignments. If the replication factor was increased, we'll include the + // created assignments. If the replication factor did not change, the query returns the + // current assignments. + rows, err := s.db.QueryContext(ctx, ` +WITH repository AS ( + SELECT virtual_storage, relative_path, "primary" + FROM repositories + WHERE virtual_storage = $1 + AND relative_path = $2 + FOR UPDATE +), + +existing_assignments AS ( + SELECT storage + FROM repository + JOIN repository_assignments USING (virtual_storage, relative_path) + WHERE storage = ANY($4::text[]) +), + +created_assignments AS ( + INSERT INTO repository_assignments + SELECT virtual_storage, relative_path, storage + FROM repository + CROSS JOIN ( SELECT unnest($4::text[]) AS storage ) AS configured_storages + WHERE storage NOT IN ( SELECT storage FROM existing_assignments ) + ORDER BY CASE WHEN storage = "primary" THEN 1 ELSE 0 END DESC, random() + LIMIT ( SELECT GREATEST(COUNT(*), $3) - COUNT(*) FROM existing_assignments ) + RETURNING storage +), + +removed_assignments AS ( + DELETE FROM repository_assignments + USING ( + SELECT virtual_storage, relative_path, storage + FROM repository, existing_assignments + WHERE storage != "primary" + ORDER BY random() + LIMIT ( SELECT COUNT(*) - LEAST(COUNT(*), $3) FROM existing_assignments ) + ) AS removals + WHERE repository_assignments.virtual_storage = removals.virtual_storage + AND repository_assignments.relative_path = removals.relative_path + AND repository_assignments.storage = removals.storage + RETURNING removals.storage +) + +SELECT storage +FROM existing_assignments +WHERE storage NOT IN ( SELECT storage FROM removed_assignments ) +UNION +SELECT storage +FROM created_assignments +ORDER BY storage + `, virtualStorage, relativePath, replicationFactor, pq.StringArray(candidateStorages)) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + + defer rows.Close() + + var storages []string + for rows.Next() { + var storage string + if err := rows.Scan(&storage); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + + storages = append(storages, storage) + } + + if len(storages) == 0 { + return nil, newRepositoryNotFoundError(virtualStorage, relativePath) + } + + return storages, rows.Err() +} diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go index 5baeda66d..96f9add51 100644 --- a/internal/praefect/datastore/assignment_test.go +++ b/internal/praefect/datastore/assignment_test.go @@ -98,3 +98,133 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) { }) } } + +func TestAssignmentStore_SetReplicationFactor(t *testing.T) { + type matcher func(testing.TB, []string) + + equal := func(expected []string) matcher { + return func(t testing.TB, actual []string) { + t.Helper() + require.Equal(t, expected, actual) + } + } + + contains := func(expecteds ...[]string) matcher { + return func(t testing.TB, actual []string) { + t.Helper() + require.Contains(t, expecteds, actual) + } + } + + for _, tc := range []struct { + desc string + existingAssignments []string + nonExistentRepository bool + replicationFactor int + requireStorages matcher + error error + }{ + { + desc: "increase replication factor of non-existent repository", + nonExistentRepository: true, + replicationFactor: 1, + error: newRepositoryNotFoundError("virtual-storage", "relative-path"), + }, + { + desc: "primary prioritized when setting the first assignments", + replicationFactor: 1, + requireStorages: equal([]string{"primary"}), + }, + { + desc: "increasing replication factor ignores unconfigured storages", + existingAssignments: []string{"unconfigured-storage"}, + replicationFactor: 1, + requireStorages: equal([]string{"primary"}), + }, + { + desc: "replication factor already achieved", + existingAssignments: []string{"primary", "secondary-1"}, + replicationFactor: 2, + requireStorages: equal([]string{"primary", "secondary-1"}), + }, + { + desc: "increase replication factor by a step", + existingAssignments: []string{"primary"}, + replicationFactor: 2, + requireStorages: contains([]string{"primary", "secondary-1"}, []string{"primary", "secondary-2"}), + }, + { + desc: "increase replication factor to maximum", + existingAssignments: []string{"primary"}, + replicationFactor: 3, + requireStorages: equal([]string{"primary", "secondary-1", "secondary-2"}), + }, + { + desc: "increased replication factor unattainable", + existingAssignments: []string{"primary"}, + replicationFactor: 4, + error: newUnattainableReplicationFactorError(4, 3), + }, + { + desc: "decreasing replication factor ignores unconfigured storages", + existingAssignments: []string{"secondary-1", "unconfigured-storage"}, + replicationFactor: 1, + requireStorages: equal([]string{"secondary-1"}), + }, + { + desc: "decrease replication factor by a step", + existingAssignments: []string{"primary", "secondary-1", "secondary-2"}, + replicationFactor: 2, + requireStorages: contains([]string{"primary", "secondary-1"}, []string{"primary", "secondary-2"}), + }, + { + desc: "decrease replication factor to minimum", + existingAssignments: []string{"primary", "secondary-1", "secondary-2"}, + replicationFactor: 1, + requireStorages: equal([]string{"primary"}), + }, + { + desc: "minimum replication factor is enforced", + replicationFactor: 0, + error: newMinimumReplicationFactorError(0), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + db := getDB(t) + + configuredStorages := map[string][]string{"virtual-storage": {"primary", "secondary-1", "secondary-2"}} + + if !tc.nonExistentRepository { + _, err := db.ExecContext(ctx, ` + INSERT INTO repositories (virtual_storage, relative_path, "primary") + VALUES ('virtual-storage', 'relative-path', 'primary') + `) + require.NoError(t, err) + } + + for _, storage := range tc.existingAssignments { + _, err := db.ExecContext(ctx, ` + INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1) + `, storage) + require.NoError(t, err) + } + + store := NewAssignmentStore(db, configuredStorages) + + setStorages, err := store.SetReplicationFactor(ctx, "virtual-storage", "relative-path", tc.replicationFactor) + require.Equal(t, tc.error, err) + if tc.error != nil { + return + } + + tc.requireStorages(t, setStorages) + + assignedStorages, err := store.GetHostAssignments(ctx, "virtual-storage", "relative-path") + require.NoError(t, err) + tc.requireStorages(t, assignedStorages) + }) + } +} diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index b742ca199..d469003c2 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -242,7 +242,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp NodeSetFromNodeManager(opt.withNodeMgr), ) - prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, rs) + prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, rs, nil) listener, port := listenAvailPort(t) t.Logf("proxy listening on port %d", port) diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index f2aa9f03d..a50bee79c 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -363,7 +363,7 @@ func TestPropagateReplicationJob(t *testing.T) { replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr)) - prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs) + prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil) listener, port := listenAvailPort(t) ctx, cancel := testhelper.Context() diff --git a/internal/praefect/server.go b/internal/praefect/server.go index ea263b602..e82bd7826 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -46,6 +46,7 @@ func NewGRPCServer( txMgr *transactions.Manager, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, + rfs info.ReplicationFactorSetter, grpcOpts ...grpc.ServerOption, ) *grpc.Server { ctxTagOpts := []grpc_ctxtags.Option{ @@ -93,7 +94,7 @@ func NewGRPCServer( warnDupeAddrs(logger, conf) srv := grpc.NewServer(grpcOpts...) - registerServices(srv, nodeMgr, txMgr, conf, queue, rs) + registerServices(srv, nodeMgr, txMgr, conf, queue, rs, rfs) return srv } @@ -105,10 +106,10 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { } // registerServices registers services praefect needs to handle RPCs on its own. -func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore) { +func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, rfs info.ReplicationFactorSetter) { // ServerServiceServer is necessary for the ServerInfo RPC gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, nm)) - gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, rs)) + gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, rs, rfs)) gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm)) healthpb.RegisterHealthServer(srv, health.NewServer()) diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go index 568997089..79cb25699 100644 --- a/internal/praefect/server_factory.go +++ b/internal/praefect/server_factory.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -26,6 +27,7 @@ func NewServerFactory( txMgr *transactions.Manager, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, + rfs info.ReplicationFactorSetter, registry *protoregistry.Registry, ) *ServerFactory { return &ServerFactory{ @@ -36,6 +38,7 @@ func NewServerFactory( txMgr: txMgr, queue: queue, rs: rs, + rfs: rfs, registry: registry, } } @@ -50,6 +53,7 @@ type ServerFactory struct { txMgr *transactions.Manager queue datastore.ReplicationEventQueue rs datastore.RepositoryStore + rfs info.ReplicationFactorSetter registry *protoregistry.Registry secure, insecure []*grpc.Server } @@ -116,6 +120,7 @@ func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server { s.txMgr, s.queue, s.rs, + s.rfs, grpcOpts..., ) } diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index b0fb63e06..d1601a566 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -117,7 +117,7 @@ func TestServerFactory(t *testing.T) { } t.Run("insecure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, "localhost:0") @@ -146,7 +146,7 @@ func TestServerFactory(t *testing.T) { }) t.Run("secure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, "localhost:0") @@ -185,7 +185,7 @@ func TestServerFactory(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry) defer praefectServerFactory.Stop() // start with tcp address @@ -253,7 +253,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls key path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.KeyPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") @@ -262,7 +262,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls cert path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.CertPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") diff --git a/internal/praefect/service/info/consistencycheck_test.go b/internal/praefect/service/info/consistencycheck_test.go index ee1b2a280..1044ae915 100644 --- a/internal/praefect/service/info/consistencycheck_test.go +++ b/internal/praefect/service/info/consistencycheck_test.go @@ -115,7 +115,7 @@ func TestServer_ConsistencyCheck(t *testing.T) { grpcSrv := grpc.NewServer() defer grpcSrv.Stop() - gitalypb.RegisterPraefectInfoServiceServer(grpcSrv, NewServer(nm, conf, queue, nil)) + gitalypb.RegisterPraefectInfoServiceServer(grpcSrv, NewServer(nm, conf, queue, nil, nil)) go grpcSrv.Serve(praefectListener) infoConn, err := client.Dial("unix://"+praefectAddr, nil) diff --git a/internal/praefect/service/info/replication_factor.go b/internal/praefect/service/info/replication_factor.go new file mode 100644 index 000000000..c38a2cb68 --- /dev/null +++ b/internal/praefect/service/info/replication_factor.go @@ -0,0 +1,28 @@ +package info + +import ( + "context" + "fmt" + + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +// ReplicationFactorSetter sets a repository's replication factor +type ReplicationFactorSetter interface { + // SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met. + // Please see the protobuf documentation of the method for details. + SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error) +} + +func (s *Server) SetReplicationFactor(ctx context.Context, req *gitalypb.SetReplicationFactorRequest) (*gitalypb.SetReplicationFactorResponse, error) { + if s.rfs == nil { + return nil, fmt.Errorf("setting replication factor is only possible when Praefect is ran with 'per_repository' elector") + } + + storages, err := s.rfs.SetReplicationFactor(ctx, req.VirtualStorage, req.RelativePath, int(req.ReplicationFactor)) + if err != nil { + return nil, fmt.Errorf("set replication factor: %w", err) + } + + return &gitalypb.SetReplicationFactorResponse{Storages: storages}, nil +} diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index 9a812a4d1..9cb67efa3 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -16,15 +16,18 @@ type Server struct { conf config.Config queue datastore.ReplicationEventQueue rs datastore.RepositoryStore + + rfs ReplicationFactorSetter } // NewServer creates a new instance of a grpc InfoServiceServer -func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore) gitalypb.PraefectInfoServiceServer { +func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, rfs ReplicationFactorSetter) gitalypb.PraefectInfoServiceServer { return &Server{ nodeMgr: nodeMgr, conf: conf, queue: queue, rs: rs, + rfs: rfs, } } diff --git a/proto/go/gitalypb/praefect.pb.go b/proto/go/gitalypb/praefect.pb.go index d796800d4..f47e84d3e 100644 --- a/proto/go/gitalypb/praefect.pb.go +++ b/proto/go/gitalypb/praefect.pb.go @@ -24,6 +24,106 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package +// SetReplicationFactorRequest sets the desired replication factor for a repository. +type SetReplicationFactorRequest struct { + // virtual_storage is the virtual storage the repository is located in + VirtualStorage string `protobuf:"bytes,1,opt,name=virtual_storage,json=virtualStorage,proto3" json:"virtual_storage,omitempty"` + // relative_path is the relative path of the repository + RelativePath string `protobuf:"bytes,2,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"` + // replication_factor is the desired replication factor. Replication must be equal or greater than 1. + ReplicationFactor int32 `protobuf:"varint,3,opt,name=replication_factor,json=replicationFactor,proto3" json:"replication_factor,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SetReplicationFactorRequest) Reset() { *m = SetReplicationFactorRequest{} } +func (m *SetReplicationFactorRequest) String() string { return proto.CompactTextString(m) } +func (*SetReplicationFactorRequest) ProtoMessage() {} +func (*SetReplicationFactorRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_d32bf44842ead735, []int{0} +} + +func (m *SetReplicationFactorRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SetReplicationFactorRequest.Unmarshal(m, b) +} +func (m *SetReplicationFactorRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SetReplicationFactorRequest.Marshal(b, m, deterministic) +} +func (m *SetReplicationFactorRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SetReplicationFactorRequest.Merge(m, src) +} +func (m *SetReplicationFactorRequest) XXX_Size() int { + return xxx_messageInfo_SetReplicationFactorRequest.Size(m) +} +func (m *SetReplicationFactorRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SetReplicationFactorRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SetReplicationFactorRequest proto.InternalMessageInfo + +func (m *SetReplicationFactorRequest) GetVirtualStorage() string { + if m != nil { + return m.VirtualStorage + } + return "" +} + +func (m *SetReplicationFactorRequest) GetRelativePath() string { + if m != nil { + return m.RelativePath + } + return "" +} + +func (m *SetReplicationFactorRequest) GetReplicationFactor() int32 { + if m != nil { + return m.ReplicationFactor + } + return 0 +} + +// SetReplicationFactorResponse returns the assigned hosts after setting the desired replication factor. +type SetReplicationFactorResponse struct { + // storages are the storages assigned to host the repository. + Storages []string `protobuf:"bytes,1,rep,name=storages,proto3" json:"storages,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SetReplicationFactorResponse) Reset() { *m = SetReplicationFactorResponse{} } +func (m *SetReplicationFactorResponse) String() string { return proto.CompactTextString(m) } +func (*SetReplicationFactorResponse) ProtoMessage() {} +func (*SetReplicationFactorResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_d32bf44842ead735, []int{1} +} + +func (m *SetReplicationFactorResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SetReplicationFactorResponse.Unmarshal(m, b) +} +func (m *SetReplicationFactorResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SetReplicationFactorResponse.Marshal(b, m, deterministic) +} +func (m *SetReplicationFactorResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SetReplicationFactorResponse.Merge(m, src) +} +func (m *SetReplicationFactorResponse) XXX_Size() int { + return xxx_messageInfo_SetReplicationFactorResponse.Size(m) +} +func (m *SetReplicationFactorResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SetReplicationFactorResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_SetReplicationFactorResponse proto.InternalMessageInfo + +func (m *SetReplicationFactorResponse) GetStorages() []string { + if m != nil { + return m.Storages + } + return nil +} + type SetAuthoritativeStorageRequest struct { VirtualStorage string `protobuf:"bytes,1,opt,name=virtual_storage,json=virtualStorage,proto3" json:"virtual_storage,omitempty"` RelativePath string `protobuf:"bytes,2,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"` @@ -37,7 +137,7 @@ func (m *SetAuthoritativeStorageRequest) Reset() { *m = SetAuthoritative func (m *SetAuthoritativeStorageRequest) String() string { return proto.CompactTextString(m) } func (*SetAuthoritativeStorageRequest) ProtoMessage() {} func (*SetAuthoritativeStorageRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{0} + return fileDescriptor_d32bf44842ead735, []int{2} } func (m *SetAuthoritativeStorageRequest) XXX_Unmarshal(b []byte) error { @@ -89,7 +189,7 @@ func (m *SetAuthoritativeStorageResponse) Reset() { *m = SetAuthoritativ func (m *SetAuthoritativeStorageResponse) String() string { return proto.CompactTextString(m) } func (*SetAuthoritativeStorageResponse) ProtoMessage() {} func (*SetAuthoritativeStorageResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{1} + return fileDescriptor_d32bf44842ead735, []int{3} } func (m *SetAuthoritativeStorageResponse) XXX_Unmarshal(b []byte) error { @@ -126,7 +226,7 @@ func (m *DatalossCheckRequest) Reset() { *m = DatalossCheckRequest{} } func (m *DatalossCheckRequest) String() string { return proto.CompactTextString(m) } func (*DatalossCheckRequest) ProtoMessage() {} func (*DatalossCheckRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{2} + return fileDescriptor_d32bf44842ead735, []int{4} } func (m *DatalossCheckRequest) XXX_Unmarshal(b []byte) error { @@ -173,7 +273,7 @@ func (m *DatalossCheckResponse) Reset() { *m = DatalossCheckResponse{} } func (m *DatalossCheckResponse) String() string { return proto.CompactTextString(m) } func (*DatalossCheckResponse) ProtoMessage() {} func (*DatalossCheckResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{3} + return fileDescriptor_d32bf44842ead735, []int{5} } func (m *DatalossCheckResponse) XXX_Unmarshal(b []byte) error { @@ -219,7 +319,7 @@ func (m *DatalossCheckResponse_Repository) Reset() { *m = DatalossCheckR func (m *DatalossCheckResponse_Repository) String() string { return proto.CompactTextString(m) } func (*DatalossCheckResponse_Repository) ProtoMessage() {} func (*DatalossCheckResponse_Repository) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{3, 0} + return fileDescriptor_d32bf44842ead735, []int{5, 0} } func (m *DatalossCheckResponse_Repository) XXX_Unmarshal(b []byte) error { @@ -286,7 +386,7 @@ func (m *DatalossCheckResponse_Repository_Storage) Reset() { func (m *DatalossCheckResponse_Repository_Storage) String() string { return proto.CompactTextString(m) } func (*DatalossCheckResponse_Repository_Storage) ProtoMessage() {} func (*DatalossCheckResponse_Repository_Storage) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{3, 0, 0} + return fileDescriptor_d32bf44842ead735, []int{5, 0, 0} } func (m *DatalossCheckResponse_Repository_Storage) XXX_Unmarshal(b []byte) error { @@ -339,7 +439,7 @@ func (m *RepositoryReplicasRequest) Reset() { *m = RepositoryReplicasReq func (m *RepositoryReplicasRequest) String() string { return proto.CompactTextString(m) } func (*RepositoryReplicasRequest) ProtoMessage() {} func (*RepositoryReplicasRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{4} + return fileDescriptor_d32bf44842ead735, []int{6} } func (m *RepositoryReplicasRequest) XXX_Unmarshal(b []byte) error { @@ -379,7 +479,7 @@ func (m *RepositoryReplicasResponse) Reset() { *m = RepositoryReplicasRe func (m *RepositoryReplicasResponse) String() string { return proto.CompactTextString(m) } func (*RepositoryReplicasResponse) ProtoMessage() {} func (*RepositoryReplicasResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{5} + return fileDescriptor_d32bf44842ead735, []int{7} } func (m *RepositoryReplicasResponse) XXX_Unmarshal(b []byte) error { @@ -430,7 +530,7 @@ func (m *RepositoryReplicasResponse_RepositoryDetails) String() string { } func (*RepositoryReplicasResponse_RepositoryDetails) ProtoMessage() {} func (*RepositoryReplicasResponse_RepositoryDetails) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{5, 0} + return fileDescriptor_d32bf44842ead735, []int{7, 0} } func (m *RepositoryReplicasResponse_RepositoryDetails) XXX_Unmarshal(b []byte) error { @@ -486,7 +586,7 @@ func (m *ConsistencyCheckRequest) Reset() { *m = ConsistencyCheckRequest func (m *ConsistencyCheckRequest) String() string { return proto.CompactTextString(m) } func (*ConsistencyCheckRequest) ProtoMessage() {} func (*ConsistencyCheckRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{6} + return fileDescriptor_d32bf44842ead735, []int{8} } func (m *ConsistencyCheckRequest) XXX_Unmarshal(b []byte) error { @@ -553,7 +653,7 @@ func (m *ConsistencyCheckResponse) Reset() { *m = ConsistencyCheckRespon func (m *ConsistencyCheckResponse) String() string { return proto.CompactTextString(m) } func (*ConsistencyCheckResponse) ProtoMessage() {} func (*ConsistencyCheckResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{7} + return fileDescriptor_d32bf44842ead735, []int{9} } func (m *ConsistencyCheckResponse) XXX_Unmarshal(b []byte) error { @@ -610,6 +710,8 @@ func (m *ConsistencyCheckResponse) GetReferenceStorage() string { } func init() { + proto.RegisterType((*SetReplicationFactorRequest)(nil), "gitaly.SetReplicationFactorRequest") + proto.RegisterType((*SetReplicationFactorResponse)(nil), "gitaly.SetReplicationFactorResponse") proto.RegisterType((*SetAuthoritativeStorageRequest)(nil), "gitaly.SetAuthoritativeStorageRequest") proto.RegisterType((*SetAuthoritativeStorageResponse)(nil), "gitaly.SetAuthoritativeStorageResponse") proto.RegisterType((*DatalossCheckRequest)(nil), "gitaly.DatalossCheckRequest") @@ -626,55 +728,60 @@ func init() { func init() { proto.RegisterFile("praefect.proto", fileDescriptor_d32bf44842ead735) } var fileDescriptor_d32bf44842ead735 = []byte{ - // 757 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x4d, 0x6f, 0xd3, 0x4c, - 0x10, 0x96, 0x93, 0xbc, 0xad, 0x3b, 0xfd, 0xde, 0xb7, 0x7d, 0x9b, 0xd7, 0x94, 0x7e, 0x18, 0x41, - 0x23, 0x41, 0x93, 0x2a, 0x45, 0xe2, 0x0a, 0x6d, 0x2f, 0x45, 0x15, 0x8d, 0x5c, 0x09, 0x24, 0x38, - 0x58, 0x6b, 0x7b, 0x9b, 0x6c, 0xd9, 0x78, 0xcd, 0xee, 0xa6, 0x92, 0x8f, 0x9c, 0x91, 0xb8, 0xf2, - 0x03, 0xfa, 0x83, 0xf8, 0x25, 0x48, 0x9c, 0x38, 0x23, 0x7f, 0xac, 0x9b, 0x26, 0x4e, 0x0b, 0xbd, - 0x79, 0x67, 0x9e, 0x79, 0x66, 0xe6, 0x99, 0xf1, 0x2e, 0x2c, 0x44, 0x02, 0x93, 0x73, 0xe2, 0xab, - 0x66, 0x24, 0xb8, 0xe2, 0x68, 0xaa, 0x4b, 0x15, 0x66, 0xb1, 0x05, 0x8c, 0x86, 0xb9, 0xcd, 0x9a, - 0x93, 0x3d, 0x2c, 0x48, 0x90, 0x9d, 0xec, 0x2b, 0x03, 0x36, 0xce, 0x88, 0x7a, 0x35, 0x50, 0x3d, - 0x2e, 0xa8, 0xc2, 0x8a, 0x5e, 0x92, 0x33, 0xc5, 0x05, 0xee, 0x12, 0x87, 0x7c, 0x1a, 0x10, 0xa9, - 0xd0, 0x0e, 0x2c, 0x5e, 0x52, 0xa1, 0x06, 0x98, 0xb9, 0x32, 0xf3, 0xd4, 0x8d, 0x2d, 0xa3, 0x31, - 0xe3, 0x2c, 0xe4, 0xe6, 0x1c, 0x8f, 0x1e, 0xc1, 0xbc, 0x20, 0x2c, 0xa5, 0x70, 0x23, 0xac, 0x7a, - 0xf5, 0x4a, 0x0a, 0x9b, 0xd3, 0xc6, 0x0e, 0x56, 0x3d, 0xb4, 0x0f, 0xab, 0x78, 0x38, 0x59, 0xc1, - 0x59, 0x4d, 0xc1, 0x2b, 0xb8, 0xa4, 0x12, 0x7b, 0x1b, 0x36, 0x27, 0x16, 0x29, 0x23, 0x1e, 0x4a, - 0x62, 0x7f, 0x36, 0x60, 0xe5, 0x08, 0x2b, 0xcc, 0xb8, 0x94, 0x87, 0x3d, 0xe2, 0x7f, 0xfc, 0xeb, - 0xf2, 0x5f, 0xc2, 0x3a, 0x0d, 0x7d, 0x36, 0x08, 0x92, 0xea, 0x85, 0xa2, 0x98, 0xb1, 0xd8, 0x15, - 0x24, 0x62, 0xd4, 0xc7, 0x8a, 0x04, 0x69, 0x37, 0xa6, 0x63, 0xe5, 0x98, 0x8e, 0x86, 0x38, 0x05, - 0xc2, 0xfe, 0x55, 0x81, 0xd5, 0x91, 0x1a, 0xb2, 0xea, 0xd0, 0x09, 0xcc, 0x09, 0x12, 0x71, 0x49, - 0x15, 0x17, 0x94, 0xc8, 0x7a, 0x65, 0xab, 0xda, 0x98, 0x6d, 0x37, 0x9a, 0xd9, 0x7c, 0x9a, 0xa5, - 0x41, 0x4d, 0x47, 0x47, 0xc4, 0xce, 0x8d, 0x68, 0xeb, 0x6b, 0x05, 0xe0, 0xda, 0x39, 0xae, 0xbb, - 0x51, 0xa2, 0xfb, 0x09, 0x98, 0x79, 0xfb, 0x3a, 0xfb, 0xde, 0x9f, 0x66, 0x6f, 0x6a, 0xad, 0x0b, - 0x06, 0xf4, 0x00, 0x66, 0x04, 0xc1, 0x81, 0xcb, 0x43, 0x16, 0xa7, 0x93, 0x33, 0x1d, 0x33, 0x31, - 0x9c, 0x86, 0x2c, 0x46, 0x75, 0x98, 0x8e, 0x04, 0xed, 0x63, 0x11, 0xd7, 0x6b, 0x69, 0x25, 0xfa, - 0x68, 0xbd, 0x85, 0x69, 0xad, 0x36, 0x82, 0x5a, 0x88, 0xfb, 0x7a, 0x16, 0xe9, 0x77, 0xc2, 0xea, - 0x91, 0x1e, 0x0d, 0x03, 0xd7, 0x8b, 0x53, 0xb9, 0xab, 0x8e, 0x99, 0x19, 0x0e, 0x62, 0x64, 0x81, - 0x89, 0xa5, 0xa4, 0xdd, 0x90, 0x04, 0x3a, 0xa3, 0x3e, 0xdb, 0xa7, 0xf0, 0xff, 0x90, 0x58, 0xd9, - 0x40, 0xa4, 0x5e, 0x80, 0x36, 0x40, 0xa1, 0x5e, 0x9c, 0xe6, 0x9b, 0x6d, 0x23, 0xdd, 0xfb, 0x50, - 0xd8, 0x10, 0xca, 0xbe, 0xaa, 0x80, 0x55, 0xc6, 0x98, 0x8f, 0xf3, 0xcd, 0x75, 0x87, 0x19, 0xdf, - 0xf3, 0x12, 0xbe, 0x91, 0xa0, 0x21, 0xd7, 0x11, 0x51, 0x98, 0x32, 0x59, 0xe8, 0x82, 0x3a, 0x60, - 0xe6, 0x8b, 0xa6, 0x87, 0x73, 0x3f, 0xc2, 0x82, 0xc5, 0xf2, 0x61, 0x79, 0xcc, 0x7d, 0x1f, 0x25, - 0x12, 0xd9, 0xfd, 0x64, 0x2f, 0xe4, 0xa0, 0x9f, 0xff, 0xcf, 0xc5, 0xd9, 0xfe, 0x6e, 0xc0, 0xda, - 0x21, 0x0f, 0x25, 0x95, 0x8a, 0x84, 0x7e, 0x7c, 0xbf, 0xdf, 0xee, 0x31, 0x2c, 0x28, 0x2c, 0xba, - 0x44, 0x15, 0xb8, 0x2c, 0xcd, 0x7c, 0x66, 0xd5, 0xb0, 0xa7, 0xb0, 0x2c, 0xc8, 0x39, 0x11, 0x24, - 0xf4, 0x47, 0xef, 0x8c, 0xa5, 0xc2, 0xa1, 0xc1, 0x2f, 0x60, 0x2d, 0xa0, 0x12, 0x7b, 0x8c, 0xb8, - 0x82, 0xf8, 0x3c, 0xf4, 0x29, 0x63, 0x14, 0x2b, 0xca, 0xc3, 0x74, 0x23, 0x4d, 0xe7, 0xbf, 0xdc, - 0xed, 0xdc, 0xf4, 0xda, 0x3f, 0x0c, 0xa8, 0x8f, 0x77, 0x94, 0x4f, 0xfd, 0x19, 0xa0, 0x44, 0x18, - 0xb7, 0xec, 0x67, 0x5b, 0x4a, 0x3c, 0xce, 0xf0, 0x0f, 0xb7, 0x03, 0x8b, 0x79, 0x5f, 0x23, 0xfa, - 0xe5, 0xed, 0x1e, 0xe6, 0x56, 0xb4, 0x9b, 0xd0, 0xea, 0xce, 0x0a, 0x6c, 0xd6, 0xda, 0x75, 0xcf, - 0x05, 0x7c, 0x03, 0x66, 0x93, 0x29, 0xbb, 0x17, 0xdc, 0x73, 0x69, 0x90, 0xf6, 0x53, 0x73, 0x66, - 0x12, 0xd3, 0x6b, 0xee, 0x1d, 0x07, 0xe5, 0x42, 0xfd, 0x53, 0x2e, 0x54, 0xfb, 0x4b, 0x15, 0xfe, - 0xed, 0xe4, 0x6f, 0xc6, 0x71, 0x78, 0xce, 0xcf, 0x88, 0xb8, 0xa4, 0x3e, 0x41, 0x1f, 0x00, 0x8d, - 0x2f, 0x1e, 0xda, 0xbe, 0x6d, 0x29, 0xd3, 0xb1, 0x5b, 0xf6, 0xdd, 0x7b, 0x8b, 0xde, 0xc1, 0xd2, - 0xa8, 0xc6, 0x68, 0x53, 0xc7, 0x4d, 0xd8, 0x27, 0x6b, 0x6b, 0x32, 0x20, 0xa3, 0xdd, 0x33, 0xd0, - 0x09, 0xcc, 0xdf, 0xb8, 0xcb, 0xd0, 0xfa, 0x84, 0x2b, 0x2e, 0xa3, 0x7c, 0x78, 0xeb, 0x05, 0x88, - 0x2e, 0x60, 0x6d, 0xc2, 0xa3, 0x83, 0x9e, 0xe8, 0xc8, 0xdb, 0x9f, 0x4e, 0x6b, 0xe7, 0x4e, 0x5c, - 0x96, 0xcb, 0xaa, 0xfd, 0xfc, 0xd6, 0x30, 0x0e, 0xf6, 0xde, 0x27, 0x78, 0x86, 0xbd, 0xa6, 0xcf, - 0xfb, 0xad, 0xec, 0x73, 0x97, 0x8b, 0x6e, 0x2b, 0x63, 0x69, 0xa5, 0x4f, 0x76, 0xab, 0xcb, 0xf3, - 0x73, 0xe4, 0x79, 0x53, 0xa9, 0x69, 0xff, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0xda, 0xfe, 0xa3, - 0x74, 0xf9, 0x07, 0x00, 0x00, + // 834 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x4f, 0x6f, 0xdc, 0x44, + 0x14, 0x97, 0x77, 0xd3, 0x76, 0xf3, 0xf2, 0xa7, 0xc9, 0x90, 0x12, 0xe3, 0x86, 0x76, 0xeb, 0x02, + 0x59, 0x09, 0xba, 0x89, 0x52, 0x24, 0x24, 0x4e, 0xd0, 0x54, 0x48, 0x45, 0x11, 0x8d, 0x1c, 0x09, + 0x24, 0x38, 0x58, 0x63, 0x7b, 0xb2, 0x3b, 0x65, 0xd6, 0x63, 0x66, 0x66, 0x23, 0xf9, 0xc8, 0x17, + 0xe0, 0x8a, 0xc4, 0xb5, 0x1f, 0x88, 0x4f, 0x02, 0xe2, 0xc4, 0x19, 0x79, 0xfe, 0x78, 0x9d, 0x5d, + 0xef, 0x06, 0x22, 0x71, 0xf3, 0xbc, 0xf7, 0x7b, 0xff, 0x7e, 0xef, 0xcd, 0x1b, 0xc3, 0x76, 0x21, + 0x30, 0xb9, 0x24, 0xa9, 0x1a, 0x16, 0x82, 0x2b, 0x8e, 0xee, 0x8e, 0xa8, 0xc2, 0xac, 0x0c, 0x80, + 0xd1, 0xdc, 0xca, 0x82, 0x4d, 0x39, 0xc6, 0x82, 0x64, 0xe6, 0x14, 0xfe, 0xe6, 0xc1, 0xc3, 0x0b, + 0xa2, 0x22, 0x52, 0x30, 0x9a, 0x62, 0x45, 0x79, 0xfe, 0x15, 0x4e, 0x15, 0x17, 0x11, 0xf9, 0x69, + 0x4a, 0xa4, 0x42, 0x87, 0x70, 0xff, 0x8a, 0x0a, 0x35, 0xc5, 0x2c, 0x96, 0x8a, 0x0b, 0x3c, 0x22, + 0xbe, 0xd7, 0xf7, 0x06, 0xeb, 0xd1, 0xb6, 0x15, 0x5f, 0x18, 0x29, 0x7a, 0x0a, 0x5b, 0x82, 0x30, + 0xac, 0xe8, 0x15, 0x89, 0x0b, 0xac, 0xc6, 0x7e, 0x47, 0xc3, 0x36, 0x9d, 0xf0, 0x1c, 0xab, 0x31, + 0x7a, 0x06, 0x48, 0xcc, 0x22, 0xc5, 0x97, 0x3a, 0x94, 0xdf, 0xed, 0x7b, 0x83, 0x3b, 0xd1, 0xae, + 0x98, 0xcf, 0x21, 0xfc, 0x1c, 0x0e, 0xda, 0x73, 0x93, 0x05, 0xcf, 0x25, 0x41, 0x01, 0xf4, 0x6c, + 0x52, 0xd2, 0xf7, 0xfa, 0xdd, 0xc1, 0x7a, 0x54, 0x9f, 0xc3, 0xb7, 0x1e, 0x3c, 0xba, 0x20, 0xea, + 0xcb, 0xa9, 0x1a, 0x73, 0x41, 0x95, 0xce, 0xc1, 0xe6, 0xfa, 0xff, 0xd4, 0xf6, 0x1c, 0x1e, 0xe0, + 0x66, 0xb0, 0xda, 0x67, 0x57, 0x83, 0xf7, 0x70, 0x4b, 0x26, 0xe1, 0x13, 0x78, 0xbc, 0x34, 0x49, + 0x53, 0x64, 0xf8, 0xb3, 0x07, 0x7b, 0x2f, 0xb1, 0xc2, 0x8c, 0x4b, 0x79, 0x3a, 0x26, 0xe9, 0x8f, + 0xff, 0x39, 0xfd, 0x2f, 0xe0, 0x80, 0xe6, 0x29, 0x9b, 0x66, 0x55, 0xf6, 0x42, 0x51, 0xcc, 0x58, + 0x19, 0x3b, 0xb6, 0x49, 0xa6, 0xab, 0xe9, 0x45, 0x81, 0xc5, 0x9c, 0x3b, 0x48, 0x54, 0x23, 0xc2, + 0xbf, 0x3b, 0xf0, 0x60, 0x2e, 0x07, 0xdb, 0x82, 0x33, 0xd8, 0x14, 0xa4, 0xe0, 0x92, 0x2a, 0x2e, + 0x28, 0x91, 0x7e, 0xa7, 0xdf, 0x1d, 0x6c, 0x9c, 0x0c, 0x86, 0x66, 0xf0, 0x86, 0xad, 0x46, 0xc3, + 0xc8, 0x59, 0x94, 0xd1, 0x35, 0xeb, 0xe0, 0x97, 0x0e, 0xc0, 0x4c, 0xb9, 0xc8, 0xbb, 0xd7, 0xc2, + 0xfb, 0x59, 0x63, 0x08, 0x4c, 0xf4, 0xe3, 0x7f, 0x1b, 0x7d, 0xe8, 0xb8, 0xae, 0x3d, 0xa0, 0x87, + 0xb0, 0x2e, 0x08, 0xce, 0x62, 0x9e, 0xb3, 0x52, 0x77, 0xae, 0x17, 0xf5, 0x2a, 0xc1, 0xeb, 0x9c, + 0x95, 0xc8, 0x87, 0x7b, 0x85, 0xa0, 0x13, 0x2c, 0x4a, 0x7f, 0x4d, 0x67, 0xe2, 0x8e, 0xc1, 0xb7, + 0x70, 0xcf, 0xb1, 0x8d, 0x60, 0x2d, 0xc7, 0x13, 0xd7, 0x0b, 0xfd, 0x5d, 0x79, 0x4d, 0xc8, 0x98, + 0xe6, 0x59, 0x9c, 0x94, 0x9a, 0xee, 0x6e, 0xd4, 0x33, 0x82, 0x17, 0x65, 0x35, 0xc5, 0x58, 0x4a, + 0x3a, 0xca, 0x49, 0xe6, 0x22, 0xba, 0x73, 0xf8, 0x1a, 0xde, 0x6b, 0x90, 0x65, 0x1a, 0x22, 0xdd, + 0x00, 0x9c, 0x00, 0xd4, 0xec, 0x95, 0x3a, 0xde, 0xc6, 0x09, 0x72, 0xb5, 0x37, 0xcc, 0x1a, 0xa8, + 0xf0, 0x6d, 0x07, 0x82, 0x36, 0x8f, 0xb6, 0x9d, 0xdf, 0xcc, 0x2a, 0x34, 0xfe, 0x3e, 0x6d, 0xf1, + 0x37, 0x67, 0xd4, 0x50, 0xbd, 0x24, 0x0a, 0x53, 0x26, 0x6b, 0x5e, 0xd0, 0x39, 0xf4, 0xec, 0xa0, + 0xb9, 0xe6, 0xdc, 0xce, 0x61, 0xed, 0x25, 0x48, 0x61, 0x77, 0x41, 0x7d, 0x1b, 0x26, 0x2a, 0xda, + 0xd3, 0x6a, 0x2e, 0xe4, 0x74, 0x62, 0xef, 0x73, 0x7d, 0x0e, 0x7f, 0xf7, 0x60, 0xff, 0x94, 0xe7, + 0x92, 0x4a, 0x45, 0xf2, 0xb4, 0xbc, 0xdd, 0xb5, 0xfb, 0x10, 0xb6, 0x15, 0x16, 0x23, 0xa2, 0x6a, + 0x9c, 0x09, 0xb3, 0x65, 0xa4, 0x0e, 0xf6, 0x31, 0xec, 0x0a, 0x72, 0x49, 0x04, 0xc9, 0xd3, 0xf9, + 0x9d, 0xb1, 0x53, 0x2b, 0x1c, 0xf8, 0x33, 0xd8, 0xcf, 0xa8, 0xc4, 0x09, 0x23, 0xb1, 0x20, 0x29, + 0xcf, 0x53, 0xca, 0x18, 0xd5, 0xab, 0x51, 0x4f, 0x64, 0x2f, 0x7a, 0xd7, 0xaa, 0xa3, 0xeb, 0xda, + 0xf0, 0x0f, 0x0f, 0xfc, 0xc5, 0x8a, 0x6c, 0xd7, 0x3f, 0xd1, 0x6b, 0x99, 0xc7, 0x6d, 0x97, 0x6d, + 0xa7, 0xd2, 0x44, 0xcd, 0x0b, 0x77, 0x08, 0xf7, 0x6d, 0x5d, 0x73, 0xfc, 0xd9, 0x72, 0x4f, 0xad, + 0xd4, 0x6c, 0x7b, 0x57, 0x59, 0x8d, 0x35, 0xa5, 0xcd, 0x6a, 0xae, 0xe1, 0x8f, 0x60, 0xa3, 0xea, + 0x72, 0xfc, 0x86, 0x27, 0x31, 0xcd, 0x74, 0x3d, 0x6b, 0xd1, 0x7a, 0x25, 0xfa, 0x9a, 0x27, 0xaf, + 0xb2, 0x76, 0xa2, 0xee, 0xb4, 0x13, 0x75, 0xf2, 0x67, 0x17, 0xde, 0x39, 0xb7, 0x8f, 0xe1, 0xab, + 0xfc, 0x92, 0x5f, 0x10, 0x71, 0x45, 0x53, 0x82, 0x7e, 0x00, 0xb4, 0x38, 0x78, 0xe8, 0xc9, 0xaa, + 0xa1, 0xd4, 0x6d, 0x0f, 0xc2, 0x9b, 0xe7, 0x16, 0x7d, 0x07, 0x3b, 0xf3, 0x1c, 0xa3, 0xc7, 0xce, + 0x6e, 0xc9, 0x3c, 0x05, 0xfd, 0xe5, 0x00, 0xe3, 0xf6, 0xd8, 0x43, 0x67, 0xb0, 0x75, 0x6d, 0x97, + 0xa1, 0x83, 0x25, 0x2b, 0xce, 0xb8, 0x7c, 0x7f, 0xe5, 0x02, 0x44, 0x6f, 0x60, 0x7f, 0xc9, 0xa3, + 0x83, 0x3e, 0x72, 0x96, 0xab, 0x9f, 0xce, 0xe0, 0xf0, 0x46, 0x9c, 0x8d, 0x85, 0x61, 0xaf, 0xed, + 0x09, 0x47, 0x4f, 0x1b, 0x0e, 0x96, 0xfd, 0x7c, 0x04, 0x1f, 0xac, 0x06, 0x99, 0x10, 0xc1, 0xda, + 0x5f, 0xbf, 0x0e, 0xbc, 0x17, 0xc7, 0xdf, 0x57, 0x60, 0x86, 0x93, 0x61, 0xca, 0x27, 0x47, 0xe6, + 0xf3, 0x19, 0x17, 0xa3, 0x23, 0xe3, 0xe2, 0x48, 0xff, 0xee, 0x1c, 0x8d, 0xb8, 0x3d, 0x17, 0x49, + 0x72, 0x57, 0x8b, 0x9e, 0xff, 0x13, 0x00, 0x00, 0xff, 0xff, 0x99, 0x51, 0xfc, 0x0b, 0x35, 0x09, + 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -701,6 +808,15 @@ type PraefectInfoServiceClient interface { // This causes the current version of the repository on the authoritative storage to be considered the // latest and overwrite any other version on the virtual storage. SetAuthoritativeStorage(ctx context.Context, in *SetAuthoritativeStorageRequest, opts ...grpc.CallOption) (*SetAuthoritativeStorageResponse, error) + // SetReplicationFactor assigns or unassigns host nodes from the repository to meet the desired replication factor. + // SetReplicationFactor returns an error when trying to set a replication factor that exceeds the storage node count + // in the virtual storage. An error is also returned when trying to set a replication factor below one. The primary node + // won't be unassigned as it needs a copy of the repository to accept writes. Likewise, the primary is the first storage + // that gets assigned when setting a replication factor for a repository. Assignments of unconfigured storages are ignored. + // This might cause the actual replication factor to be higher than desired if the replication factor is set during an upgrade + // from a Praefect node that does not yet know about a new node. As assignments of unconfigured storages are ignored, replication + // factor of repositories assigned to a storage node removed from the cluster is effectively decreased. + SetReplicationFactor(ctx context.Context, in *SetReplicationFactorRequest, opts ...grpc.CallOption) (*SetReplicationFactorResponse, error) } type praefectInfoServiceClient struct { @@ -770,6 +886,15 @@ func (c *praefectInfoServiceClient) SetAuthoritativeStorage(ctx context.Context, return out, nil } +func (c *praefectInfoServiceClient) SetReplicationFactor(ctx context.Context, in *SetReplicationFactorRequest, opts ...grpc.CallOption) (*SetReplicationFactorResponse, error) { + out := new(SetReplicationFactorResponse) + err := c.cc.Invoke(ctx, "/gitaly.PraefectInfoService/SetReplicationFactor", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // PraefectInfoServiceServer is the server API for PraefectInfoService service. type PraefectInfoServiceServer interface { RepositoryReplicas(context.Context, *RepositoryReplicasRequest) (*RepositoryReplicasResponse, error) @@ -784,6 +909,15 @@ type PraefectInfoServiceServer interface { // This causes the current version of the repository on the authoritative storage to be considered the // latest and overwrite any other version on the virtual storage. SetAuthoritativeStorage(context.Context, *SetAuthoritativeStorageRequest) (*SetAuthoritativeStorageResponse, error) + // SetReplicationFactor assigns or unassigns host nodes from the repository to meet the desired replication factor. + // SetReplicationFactor returns an error when trying to set a replication factor that exceeds the storage node count + // in the virtual storage. An error is also returned when trying to set a replication factor below one. The primary node + // won't be unassigned as it needs a copy of the repository to accept writes. Likewise, the primary is the first storage + // that gets assigned when setting a replication factor for a repository. Assignments of unconfigured storages are ignored. + // This might cause the actual replication factor to be higher than desired if the replication factor is set during an upgrade + // from a Praefect node that does not yet know about a new node. As assignments of unconfigured storages are ignored, replication + // factor of repositories assigned to a storage node removed from the cluster is effectively decreased. + SetReplicationFactor(context.Context, *SetReplicationFactorRequest) (*SetReplicationFactorResponse, error) } // UnimplementedPraefectInfoServiceServer can be embedded to have forward compatible implementations. @@ -802,6 +936,9 @@ func (*UnimplementedPraefectInfoServiceServer) DatalossCheck(ctx context.Context func (*UnimplementedPraefectInfoServiceServer) SetAuthoritativeStorage(ctx context.Context, req *SetAuthoritativeStorageRequest) (*SetAuthoritativeStorageResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SetAuthoritativeStorage not implemented") } +func (*UnimplementedPraefectInfoServiceServer) SetReplicationFactor(ctx context.Context, req *SetReplicationFactorRequest) (*SetReplicationFactorResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SetReplicationFactor not implemented") +} func RegisterPraefectInfoServiceServer(s *grpc.Server, srv PraefectInfoServiceServer) { s.RegisterService(&_PraefectInfoService_serviceDesc, srv) @@ -882,6 +1019,24 @@ func _PraefectInfoService_SetAuthoritativeStorage_Handler(srv interface{}, ctx c return interceptor(ctx, in, info, handler) } +func _PraefectInfoService_SetReplicationFactor_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SetReplicationFactorRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PraefectInfoServiceServer).SetReplicationFactor(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gitaly.PraefectInfoService/SetReplicationFactor", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PraefectInfoServiceServer).SetReplicationFactor(ctx, req.(*SetReplicationFactorRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _PraefectInfoService_serviceDesc = grpc.ServiceDesc{ ServiceName: "gitaly.PraefectInfoService", HandlerType: (*PraefectInfoServiceServer)(nil), @@ -898,6 +1053,10 @@ var _PraefectInfoService_serviceDesc = grpc.ServiceDesc{ MethodName: "SetAuthoritativeStorage", Handler: _PraefectInfoService_SetAuthoritativeStorage_Handler, }, + { + MethodName: "SetReplicationFactor", + Handler: _PraefectInfoService_SetReplicationFactor_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/proto/praefect.proto b/proto/praefect.proto index 5d2c05970..e9676d20c 100644 --- a/proto/praefect.proto +++ b/proto/praefect.proto @@ -24,6 +24,32 @@ service PraefectInfoService { // This causes the current version of the repository on the authoritative storage to be considered the // latest and overwrite any other version on the virtual storage. rpc SetAuthoritativeStorage(SetAuthoritativeStorageRequest) returns (SetAuthoritativeStorageResponse); + + // SetReplicationFactor assigns or unassigns host nodes from the repository to meet the desired replication factor. + // SetReplicationFactor returns an error when trying to set a replication factor that exceeds the storage node count + // in the virtual storage. An error is also returned when trying to set a replication factor below one. The primary node + // won't be unassigned as it needs a copy of the repository to accept writes. Likewise, the primary is the first storage + // that gets assigned when setting a replication factor for a repository. Assignments of unconfigured storages are ignored. + // This might cause the actual replication factor to be higher than desired if the replication factor is set during an upgrade + // from a Praefect node that does not yet know about a new node. As assignments of unconfigured storages are ignored, replication + // factor of repositories assigned to a storage node removed from the cluster is effectively decreased. + rpc SetReplicationFactor(SetReplicationFactorRequest) returns (SetReplicationFactorResponse); +} + +// SetReplicationFactorRequest sets the desired replication factor for a repository. +message SetReplicationFactorRequest { + // virtual_storage is the virtual storage the repository is located in + string virtual_storage = 1; + // relative_path is the relative path of the repository + string relative_path = 2; + // replication_factor is the desired replication factor. Replication must be equal or greater than 1. + int32 replication_factor = 3; +} + +// SetReplicationFactorResponse returns the assigned hosts after setting the desired replication factor. +message SetReplicationFactorResponse { + // storages are the storages assigned to host the repository. + repeated string storages = 1; } message SetAuthoritativeStorageRequest { diff --git a/ruby/proto/gitaly/praefect_pb.rb b/ruby/proto/gitaly/praefect_pb.rb index 73f17f34a..b5f5da567 100644 --- a/ruby/proto/gitaly/praefect_pb.rb +++ b/ruby/proto/gitaly/praefect_pb.rb @@ -7,6 +7,14 @@ require 'lint_pb' require 'shared_pb' Google::Protobuf::DescriptorPool.generated_pool.build do add_file("praefect.proto", :syntax => :proto3) do + add_message "gitaly.SetReplicationFactorRequest" do + optional :virtual_storage, :string, 1 + optional :relative_path, :string, 2 + optional :replication_factor, :int32, 3 + end + add_message "gitaly.SetReplicationFactorResponse" do + repeated :storages, :string, 1 + end add_message "gitaly.SetAuthoritativeStorageRequest" do optional :virtual_storage, :string, 1 optional :relative_path, :string, 2 @@ -60,6 +68,8 @@ Google::Protobuf::DescriptorPool.generated_pool.build do end module Gitaly + SetReplicationFactorRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SetReplicationFactorRequest").msgclass + SetReplicationFactorResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SetReplicationFactorResponse").msgclass SetAuthoritativeStorageRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SetAuthoritativeStorageRequest").msgclass SetAuthoritativeStorageResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SetAuthoritativeStorageResponse").msgclass DatalossCheckRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckRequest").msgclass diff --git a/ruby/proto/gitaly/praefect_services_pb.rb b/ruby/proto/gitaly/praefect_services_pb.rb index 5d1e4ae70..9eb597889 100644 --- a/ruby/proto/gitaly/praefect_services_pb.rb +++ b/ruby/proto/gitaly/praefect_services_pb.rb @@ -26,6 +26,15 @@ module Gitaly # This causes the current version of the repository on the authoritative storage to be considered the # latest and overwrite any other version on the virtual storage. rpc :SetAuthoritativeStorage, Gitaly::SetAuthoritativeStorageRequest, Gitaly::SetAuthoritativeStorageResponse + # SetReplicationFactor assigns or unassigns host nodes from the repository to meet the desired replication factor. + # SetReplicationFactor returns an error when trying to set a replication factor that exceeds the storage node count + # in the virtual storage. An error is also returned when trying to set a replication factor below one. The primary node + # won't be unassigned as it needs a copy of the repository to accept writes. Likewise, the primary is the first storage + # that gets assigned when setting a replication factor for a repository. Assignments of unconfigured storages are ignored. + # This might cause the actual replication factor to be higher than desired if the replication factor is set during an upgrade + # from a Praefect node that does not yet know about a new node. As assignments of unconfigured storages are ignored, replication + # factor of repositories assigned to a storage node removed from the cluster is effectively decreased. + rpc :SetReplicationFactor, Gitaly::SetReplicationFactorRequest, Gitaly::SetReplicationFactorResponse end Stub = Service.rpc_stub_class |