Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-03-03 15:34:27 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-03-30 10:02:32 +0300
commit985de5088680b848eadfb86ee92a019960cfbfd9 (patch)
treef924d996d63df929ccb2391d1079974421077bae
parent9dd39f4a51757e8d037404ccaac1310ef1b1da3c (diff)
praefect: Implement routing for maintenance operations
We have introduced a new "maintenance" operation type for RPCs which allows us to easily special-case repositories which fall into this class. One those locations where we do wish to special-case them is in the coordinator. All RPCs which we're about to convert to be maintenance operations are currently classified as mutators. As a consequence, we: - Need to explicitly special-case those RPCs to not have transactions enabled because they shouldn't change any on-disk state as visible to the user. Furthermore, the state we're writing to disk is often not deterministic. - Because transactions are disabled, maintenance is never performed on multiple nodes at once. As a consequence, we always create replication jobs for maintenance jobs, which requires quite some architecture and persistence of parameters in the database. - We skip nodes which are not currently consistent with the state other nodes have. This is not a requirement though: because the maintenance RPCs shouldn't have an impact on user-visible state anyway we don't really care about consistency, only about routability. Introduce special routing for those new maintenance operations. We're using a best-effort strategy: every node that is hosting the repository at hand and which is online will be included. Furthermore, we never create replication jobs. It should be acceptable to just wait for the next time a maintenance RPC is scheduled because they shouldn't have an impact on servicability. With these changes in place we can eventually get rid of replication for maintenance-style RPCs, which is a good step towards deprecating the complete replication queue. Changelog: changed
-rw-r--r--internal/metadata/featureflag/ff_maintenance_operation_routing.go4
-rw-r--r--internal/praefect/coordinator.go69
-rw-r--r--internal/praefect/coordinator_test.go190
-rw-r--r--internal/praefect/mock/mock.pb.go27
-rw-r--r--internal/praefect/mock/mock.proto8
-rw-r--r--internal/praefect/mock/mock_grpc.pb.go38
-rw-r--r--internal/praefect/router.go15
-rw-r--r--internal/praefect/router_node_manager.go31
-rw-r--r--internal/praefect/router_per_repository.go53
-rw-r--r--internal/praefect/router_per_repository_test.go213
10 files changed, 638 insertions, 10 deletions
diff --git a/internal/metadata/featureflag/ff_maintenance_operation_routing.go b/internal/metadata/featureflag/ff_maintenance_operation_routing.go
new file mode 100644
index 000000000..9f108af9c
--- /dev/null
+++ b/internal/metadata/featureflag/ff_maintenance_operation_routing.go
@@ -0,0 +1,4 @@
+package featureflag
+
+// MaintenanceOperationRouting enables routing logic that is specific to maintenance operations.
+var MaintenanceOperationRouting = NewFeatureFlag("maintenance_operation_routing", false)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index f55cebfd5..557fa439e 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -331,6 +331,12 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr
ps, err = c.accessorStreamParameters(ctx, call)
case protoregistry.OpMutator:
ps, err = c.mutatorStreamParameters(ctx, call)
+ case protoregistry.OpMaintenance:
+ if featureflag.MaintenanceOperationRouting.IsEnabled(ctx) {
+ ps, err = c.maintenanceStreamParameters(ctx, call)
+ } else {
+ ps, err = c.mutatorStreamParameters(ctx, call)
+ }
default:
err = fmt.Errorf("unknown operation type: %v", call.methodInfo.Operation)
}
@@ -584,6 +590,69 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return proxy.NewStreamParameters(primaryDest, secondaryDests, reqFinalizer, nil), nil
}
+// maintenanceStreamParameters returns stream parameters for a maintenance-style RPC. The RPC call
+// is proxied to all nodes. Because it shouldn't matter whether a node is the primary or not in this
+// context, we just pick the first node returned by the router to be the primary. Returns an error
+// in case any of the nodes has failed to perform the maintenance RPC.
+func (c *Coordinator) maintenanceStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
+ route, err := c.router.RouteRepositoryMaintenance(ctx, call.targetRepo.StorageName, call.targetRepo.RelativePath)
+ if err != nil {
+ return nil, fmt.Errorf("routing repository maintenance: %w", err)
+ }
+
+ peerCtx := streamParametersContext(ctx)
+
+ nodeDests := make([]proxy.Destination, 0, len(route.Nodes))
+ nodeErrors := &nodeErrors{
+ errByNode: make(map[string]error),
+ }
+
+ for _, node := range route.Nodes {
+ node := node
+
+ nodeMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, node.Storage, route.ReplicaPath, "")
+ if err != nil {
+ return nil, err
+ }
+
+ nodeDests = append(nodeDests, proxy.Destination{
+ Ctx: peerCtx,
+ Conn: node.Connection,
+ Msg: nodeMsg,
+ ErrHandler: func(err error) error {
+ nodeErrors.Lock()
+ defer nodeErrors.Unlock()
+ nodeErrors.errByNode[node.Storage] = err
+
+ ctxlogrus.Extract(ctx).WithField("gitaly_storage", node.Storage).WithError(err).Error("proxying maintenance RPC to node failed")
+
+ // We ignore any errors returned by nodes such that they all have a
+ // chance to finish their maintenance RPC in a best-effort strategy.
+ // In case any node fails though, the RPC call will return with an
+ // error after all nodes have finished.
+ return nil
+ },
+ })
+ }
+
+ return proxy.NewStreamParameters(nodeDests[0], nodeDests[1:], func() error {
+ nodeErrors.Lock()
+ defer nodeErrors.Unlock()
+
+ // In case any of the nodes has recorded an error we will return it. It shouldn't
+ // matter which error we return exactly, so we just return errors in the order we've
+ // got from the router. This also has the nice property that any error returned by
+ // the primary node would be prioritized over all the others.
+ for _, node := range route.Nodes {
+ if nodeErr, ok := nodeErrors.errByNode[node.Storage]; ok && nodeErr != nil {
+ return nodeErr
+ }
+ }
+
+ return nil
+ }, nil), nil
+}
+
// streamParametersContexts converts the contexts with incoming metadata into a context that is
// usable by peer Gitaly nodes.
func streamParametersContext(ctx context.Context) context.Context {
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 3fc515da0..0ea9219ba 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -386,6 +386,196 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
require.NoError(t, err)
}
+func TestStreamDirector_maintenance(t *testing.T) {
+ t.Parallel()
+
+ node1 := &config.Node{
+ Address: "unix://" + testhelper.GetTemporaryGitalySocketFileName(t),
+ Storage: "praefect-internal-1",
+ }
+
+ node2 := &config.Node{
+ Address: "unix://" + testhelper.GetTemporaryGitalySocketFileName(t),
+ Storage: "praefect-internal-2",
+ }
+
+ cfg := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{node1, node2},
+ },
+ },
+ }
+
+ db := testdb.New(t)
+
+ repo := gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: "/path/to/hashed/storage",
+ }
+
+ ctx := testhelper.Context(t)
+
+ nodeSet, err := DialNodes(ctx, cfg.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil)
+ require.NoError(t, err)
+ defer nodeSet.Close()
+
+ tx := db.Begin(t)
+ defer tx.Rollback(t)
+
+ rs := datastore.NewPostgresRepositoryStore(tx, cfg.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath,
+ repo.RelativePath, node1.Storage, []string{node2.Storage}, nil, true, true))
+
+ testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": cfg.StorageNames()})
+
+ registry, err := protoregistry.NewFromPaths("praefect/mock/mock.proto")
+ require.NoError(t, err)
+
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx))
+
+ coordinator := NewCoordinator(
+ queueInterceptor,
+ rs,
+ NewPerRepositoryRouter(
+ nodeSet.Connections(),
+ nodes.NewPerRepositoryElector(tx),
+ StaticHealthChecker(cfg.StorageNames()),
+ NewLockedRandom(rand.New(rand.NewSource(0))),
+ rs,
+ datastore.NewAssignmentStore(tx, cfg.StorageNames()),
+ rs,
+ nil,
+ ),
+ nil,
+ cfg,
+ registry,
+ )
+
+ message, err := proto.Marshal(&mock.RepoRequest{
+ Repo: &repo,
+ })
+ require.NoError(t, err)
+
+ methodInfo, err := registry.LookupMethod("/mock.SimpleService/RepoMaintenanceUnary")
+ require.NoError(t, err)
+
+ for _, tc := range []struct {
+ desc string
+ primaryErr error
+ secondaryErr error
+ expectedErr error
+ }{
+ {
+ desc: "successful",
+ },
+ {
+ desc: "primary returns an error",
+ primaryErr: helper.ErrNotFoundf("primary error"),
+ expectedErr: helper.ErrNotFoundf("primary error"),
+ },
+ {
+ desc: "secondary returns an error",
+ secondaryErr: helper.ErrNotFoundf("secondary error"),
+ expectedErr: helper.ErrNotFoundf("secondary error"),
+ },
+ {
+ desc: "primary error preferred",
+ primaryErr: helper.ErrNotFoundf("primary error"),
+ secondaryErr: helper.ErrNotFoundf("secondary error"),
+ expectedErr: helper.ErrNotFoundf("primary error"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ // Behaviour between the new and old routing are sufficiently different that
+ // it doesn't make sense to try and cram both tests in here.
+ ctx := featureflag.ContextWithFeatureFlag(ctx, featureflag.MaintenanceOperationRouting, true)
+
+ queueInterceptor.OnEnqueue(func(context.Context, datastore.ReplicationEvent, datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ require.FailNow(t, "no replication jobs should have been created")
+ return datastore.ReplicationEvent{}, fmt.Errorf("unexpected call");
+ })
+
+ streamParams, err := coordinator.StreamDirector(ctx, methodInfo.FullMethodName(), &mockPeeker{message})
+ require.NoError(t, err)
+
+ var targetNodes []string
+ destinationByAddress := make(map[string]proxy.Destination)
+ for _, node := range append(streamParams.Secondaries(), streamParams.Primary()) {
+ targetNodes = append(targetNodes, node.Conn.Target())
+ destinationByAddress[node.Conn.Target()] = node
+ }
+
+ // Assert that both nodes are part of the stream parameters. Because the
+ // order is not deterministic (we randomly shuffle primary and secondary
+ // nodes) we only assert that elements match, not that any of both nodes has
+ // a specific role.
+ require.ElementsMatch(t, []string{
+ node1.Address,
+ node2.Address,
+ }, targetNodes)
+
+ // Assert that the target repositories were rewritten as expected for all of
+ // the nodes.
+ for _, nodeCfg := range []*config.Node{node1, node2} {
+ destination, ok := destinationByAddress[nodeCfg.Address]
+ require.True(t, ok)
+
+ request, err := methodInfo.UnmarshalRequestProto(destination.Msg)
+ require.NoError(t, err)
+
+ rewrittenTargetRepo, err := methodInfo.TargetRepo(request)
+ require.NoError(t, err)
+ require.Equal(t, nodeCfg.Storage, rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
+ }
+
+ // We now inject errors into the streams by manually executing the error
+ // handlers. This simulates that the RPCs have been proxied and may or may
+ // not have returned an error.
+ //
+ // Note that these functions should not return an error: this is a strict
+ // requirement because otherwise failures returned by the primary would
+ // abort the operation on secondaries.
+ require.Nil(t, streamParams.Primary().ErrHandler(tc.primaryErr))
+ require.Nil(t, streamParams.Secondaries()[0].ErrHandler(tc.secondaryErr))
+
+ // The request finalizer should then see the errors as injected above.
+ require.Equal(t, tc.expectedErr, streamParams.RequestFinalizer())
+ })
+ }
+
+ t.Run("disabled maintenance routing", func(t *testing.T) {
+ ctx := featureflag.ContextWithFeatureFlag(ctx, featureflag.MaintenanceOperationRouting, false)
+
+ replicationEventCounter := 0
+ queueInterceptor.OnEnqueue(func(context.Context, datastore.ReplicationEvent, datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ replicationEventCounter++
+ return datastore.ReplicationEvent{}, nil
+ })
+
+ streamParams, err := coordinator.StreamDirector(ctx, methodInfo.FullMethodName(), &mockPeeker{message})
+ require.NoError(t, err)
+
+ // We're cheating a bit: the method we use here is not something that the
+ // coordinator would know, and thus it wouldn't ever set up transactions for that
+ // call either. This is accidentally the same behaviour like for any of the other
+ // preexisting maintenance-style RPCs though, so it's good enough. Furthermore, we
+ // really only want to ensure that the feature flag does its thing. The actual logic
+ // of mutator stream parameters is tested elsewhere already.
+ require.Equal(t, node1.Address, streamParams.Primary().Conn.Target())
+ require.Empty(t, streamParams.Secondaries())
+
+ // There shouldn't be any replication event yet.
+ require.Zero(t, replicationEventCounter)
+
+ require.NoError(t, streamParams.RequestFinalizer())
+
+ // But there should be one after having called the request finalizer.
+ require.Equal(t, 1, replicationEventCounter)
+ })
+}
+
type mockRouter struct {
Router
routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error)
diff --git a/internal/praefect/mock/mock.pb.go b/internal/praefect/mock/mock.pb.go
index 76ba4ff21..ffdd2df38 100644
--- a/internal/praefect/mock/mock.pb.go
+++ b/internal/praefect/mock/mock.pb.go
@@ -86,7 +86,7 @@ var file_praefect_mock_mock_proto_rawDesc = []byte{
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x04, 0x72, 0x65, 0x70, 0x6f, 0x18, 0x01,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65,
0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x04,
- 0x72, 0x65, 0x70, 0x6f, 0x32, 0x9e, 0x01, 0x0a, 0x0d, 0x53, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x53,
+ 0x72, 0x65, 0x70, 0x6f, 0x32, 0xe9, 0x01, 0x0a, 0x0d, 0x53, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x53,
0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x46, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6f, 0x41, 0x63,
0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x55, 0x6e, 0x61, 0x72, 0x79, 0x12, 0x11, 0x2e, 0x6d, 0x6f,
0x63, 0x6b, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16,
@@ -96,11 +96,16 @@ var file_praefect_mock_mock_proto_rawDesc = []byte{
0x72, 0x79, 0x12, 0x11, 0x2e, 0x6d, 0x6f, 0x63, 0x6b, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x06, 0xfa,
- 0x97, 0x28, 0x02, 0x08, 0x01, 0x42, 0x39, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e,
- 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67,
- 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x34, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e,
- 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x61, 0x65, 0x66, 0x65, 0x63, 0x74, 0x2f, 0x6d, 0x6f, 0x63, 0x6b,
- 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x97, 0x28, 0x02, 0x08, 0x01, 0x12, 0x49, 0x0a, 0x14, 0x52, 0x65, 0x70, 0x6f, 0x4d, 0x61, 0x69,
+ 0x6e, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x55, 0x6e, 0x61, 0x72, 0x79, 0x12, 0x11, 0x2e,
+ 0x6d, 0x6f, 0x63, 0x6b, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+ 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
+ 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x03,
+ 0x42, 0x39, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67,
+ 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79,
+ 0x2f, 0x76, 0x31, 0x34, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72,
+ 0x61, 0x65, 0x66, 0x65, 0x63, 0x74, 0x2f, 0x6d, 0x6f, 0x63, 0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x33,
}
var (
@@ -125,10 +130,12 @@ var file_praefect_mock_mock_proto_depIdxs = []int32{
1, // 0: mock.RepoRequest.repo:type_name -> gitaly.Repository
0, // 1: mock.SimpleService.RepoAccessorUnary:input_type -> mock.RepoRequest
0, // 2: mock.SimpleService.RepoMutatorUnary:input_type -> mock.RepoRequest
- 2, // 3: mock.SimpleService.RepoAccessorUnary:output_type -> google.protobuf.Empty
- 2, // 4: mock.SimpleService.RepoMutatorUnary:output_type -> google.protobuf.Empty
- 3, // [3:5] is the sub-list for method output_type
- 1, // [1:3] is the sub-list for method input_type
+ 0, // 3: mock.SimpleService.RepoMaintenanceUnary:input_type -> mock.RepoRequest
+ 2, // 4: mock.SimpleService.RepoAccessorUnary:output_type -> google.protobuf.Empty
+ 2, // 5: mock.SimpleService.RepoMutatorUnary:output_type -> google.protobuf.Empty
+ 2, // 6: mock.SimpleService.RepoMaintenanceUnary:output_type -> google.protobuf.Empty
+ 4, // [4:7] is the sub-list for method output_type
+ 1, // [1:4] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
diff --git a/internal/praefect/mock/mock.proto b/internal/praefect/mock/mock.proto
index 694ac2e4f..748befbf6 100644
--- a/internal/praefect/mock/mock.proto
+++ b/internal/praefect/mock/mock.proto
@@ -33,4 +33,12 @@ service SimpleService {
scope_level: REPOSITORY
};
}
+
+ // RepoMaintenanceUnary is a unary RPC that maintains a repo
+ rpc RepoMaintenanceUnary(RepoRequest) returns (google.protobuf.Empty) {
+ option (gitaly.op_type) = {
+ op: MAINTENANCE
+ scope_level: REPOSITORY
+ };
+ }
}
diff --git a/internal/praefect/mock/mock_grpc.pb.go b/internal/praefect/mock/mock_grpc.pb.go
index 60203340b..0f3f1522a 100644
--- a/internal/praefect/mock/mock_grpc.pb.go
+++ b/internal/praefect/mock/mock_grpc.pb.go
@@ -23,6 +23,8 @@ type SimpleServiceClient interface {
RepoAccessorUnary(ctx context.Context, in *RepoRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
// RepoMutatorUnary is a unary RPC that mutates a repo
RepoMutatorUnary(ctx context.Context, in *RepoRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
+ // RepoMaintenanceUnary is a unary RPC that maintains a repo
+ RepoMaintenanceUnary(ctx context.Context, in *RepoRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
}
type simpleServiceClient struct {
@@ -51,6 +53,15 @@ func (c *simpleServiceClient) RepoMutatorUnary(ctx context.Context, in *RepoRequ
return out, nil
}
+func (c *simpleServiceClient) RepoMaintenanceUnary(ctx context.Context, in *RepoRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
+ out := new(emptypb.Empty)
+ err := c.cc.Invoke(ctx, "/mock.SimpleService/RepoMaintenanceUnary", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
// SimpleServiceServer is the server API for SimpleService service.
// All implementations must embed UnimplementedSimpleServiceServer
// for forward compatibility
@@ -59,6 +70,8 @@ type SimpleServiceServer interface {
RepoAccessorUnary(context.Context, *RepoRequest) (*emptypb.Empty, error)
// RepoMutatorUnary is a unary RPC that mutates a repo
RepoMutatorUnary(context.Context, *RepoRequest) (*emptypb.Empty, error)
+ // RepoMaintenanceUnary is a unary RPC that maintains a repo
+ RepoMaintenanceUnary(context.Context, *RepoRequest) (*emptypb.Empty, error)
mustEmbedUnimplementedSimpleServiceServer()
}
@@ -72,6 +85,9 @@ func (UnimplementedSimpleServiceServer) RepoAccessorUnary(context.Context, *Repo
func (UnimplementedSimpleServiceServer) RepoMutatorUnary(context.Context, *RepoRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method RepoMutatorUnary not implemented")
}
+func (UnimplementedSimpleServiceServer) RepoMaintenanceUnary(context.Context, *RepoRequest) (*emptypb.Empty, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method RepoMaintenanceUnary not implemented")
+}
func (UnimplementedSimpleServiceServer) mustEmbedUnimplementedSimpleServiceServer() {}
// UnsafeSimpleServiceServer may be embedded to opt out of forward compatibility for this service.
@@ -121,6 +137,24 @@ func _SimpleService_RepoMutatorUnary_Handler(srv interface{}, ctx context.Contex
return interceptor(ctx, in, info, handler)
}
+func _SimpleService_RepoMaintenanceUnary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(RepoRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(SimpleServiceServer).RepoMaintenanceUnary(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/mock.SimpleService/RepoMaintenanceUnary",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(SimpleServiceServer).RepoMaintenanceUnary(ctx, req.(*RepoRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
// SimpleService_ServiceDesc is the grpc.ServiceDesc for SimpleService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -136,6 +170,10 @@ var SimpleService_ServiceDesc = grpc.ServiceDesc{
MethodName: "RepoMutatorUnary",
Handler: _SimpleService_RepoMutatorUnary_Handler,
},
+ {
+ MethodName: "RepoMaintenanceUnary",
+ Handler: _SimpleService_RepoMaintenanceUnary_Handler,
+ },
},
Streams: []grpc.StreamDesc{},
Metadata: "praefect/mock/mock.proto",
diff --git a/internal/praefect/router.go b/internal/praefect/router.go
index 1b6f19387..da9dbd36f 100644
--- a/internal/praefect/router.go
+++ b/internal/praefect/router.go
@@ -49,6 +49,16 @@ type RepositoryMutatorRoute struct {
ReplicationTargets []string
}
+// RepositoryMaintenanceRoute describes how to route a repository scoped maintenance call.
+type RepositoryMaintenanceRoute struct {
+ // RepositoryID is the repository's ID as Praefect identifies it.
+ RepositoryID int64
+ // ReplicaPath is the disk path where the replicas are stored.
+ ReplicaPath string
+ // Nodes contains all the nodes the call should be routed to.
+ Nodes []RouterNode
+}
+
// Router decides which nodes to direct accessor and mutator RPCs to.
type Router interface {
// RouteStorageAccessor returns the node which should serve the storage accessor request.
@@ -66,4 +76,9 @@ type Router interface {
// RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation
// request. It is up to the caller to store the assignments and primary information after finishing the RPC.
RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
+ // RouteRepositoryMaintenance routes the given maintenance-style RPC to all nodes which
+ // should perform maintenance. This would typically include all online nodes, regardless of
+ // whether the repository hosted by them is up-to-date or not. Maintenance tasks should
+ // never be replicated.
+ RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error)
}
diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go
index 9cceaea0a..f1f3c474a 100644
--- a/internal/praefect/router_node_manager.go
+++ b/internal/praefect/router_node_manager.go
@@ -151,3 +151,34 @@ func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtual
ReplicationTargets: replicationTargets,
}, nil
}
+
+// RouteRepositoryMaintenance includes all healthy nodes regardless of whether they're consistent or
+// not.
+func (r *nodeManagerRouter) RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error) {
+ shard, err := r.mgr.GetShard(ctx, virtualStorage)
+ if err != nil {
+ return RepositoryMaintenanceRoute{}, fmt.Errorf("get shard: %w", err)
+ }
+
+ nodes := make([]RouterNode, 0, 1+len(shard.Secondaries))
+
+ if shard.Primary.IsHealthy() {
+ nodes = append(nodes, toRouterNode(shard.Primary))
+ }
+
+ for _, secondary := range shard.Secondaries {
+ if secondary.IsHealthy() {
+ nodes = append(nodes, toRouterNode(secondary))
+ continue
+ }
+ }
+
+ if len(nodes) == 0 {
+ return RepositoryMaintenanceRoute{}, ErrNoHealthyNodes
+ }
+
+ return RepositoryMaintenanceRoute{
+ ReplicaPath: relativePath,
+ Nodes: nodes,
+ }, nil
+}
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index 2685829f8..6f1b4e7bd 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -367,3 +367,56 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu
ReplicationTargets: replicationTargets,
}, nil
}
+
+// RouteRepositoryMaintenance will route the maintenance call to all healthy nodes in a best-effort
+// strategy. We do not raise an error in case the primary node is unhealthy, but will in case all
+// nodes are unhealthy.
+func (r *PerRepositoryRouter) RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error) {
+ healthyNodes, err := r.healthyNodes(virtualStorage)
+ if err != nil {
+ return RepositoryMaintenanceRoute{}, err
+ }
+
+ healthyNodesByStorage := map[string]RouterNode{}
+ for _, healthyNode := range healthyNodes {
+ healthyNodesByStorage[healthyNode.Storage] = healthyNode
+ }
+
+ metadata, err := r.rs.GetRepositoryMetadataByPath(ctx, virtualStorage, relativePath)
+ if err != nil {
+ return RepositoryMaintenanceRoute{}, fmt.Errorf("getting repository metadata: %w", err)
+ }
+
+ nodes := make([]RouterNode, 0, len(metadata.Replicas))
+ for _, replica := range metadata.Replicas {
+ node, ok := healthyNodesByStorage[replica.Storage]
+ if !ok {
+ continue
+ }
+
+ // If the is not assigned to the replica it either hasn't yet been created
+ // or it will eventually get deleted. In neither case does it make sense to
+ // maintain it, so we skip such nodes.
+ if !replica.Assigned {
+ continue
+ }
+
+ // If the repository doesn't exist on the replica there is no need to perform any
+ // maintenance tasks at all.
+ if replica.Generation < 0 {
+ continue
+ }
+
+ nodes = append(nodes, node)
+ }
+
+ if len(nodes) == 0 {
+ return RepositoryMaintenanceRoute{}, ErrNoHealthyNodes
+ }
+
+ return RepositoryMaintenanceRoute{
+ RepositoryID: metadata.RepositoryID,
+ ReplicaPath: metadata.ReplicaPath,
+ Nodes: nodes,
+ }, nil
+}
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index dfc68552d..44d8b10ce 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -7,6 +7,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
@@ -448,6 +449,218 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
}
}
+func TestPerRepositoryRouter_RouteRepositoryMaintenance(t *testing.T) {
+ t.Parallel()
+
+ db := testdb.New(t)
+
+ var (
+ virtualStorage = "virtual-storage-1"
+ relativePath = gittest.NewRepositoryName(t, true)
+ )
+
+ configuredStorages := []string{"primary", "secondary-1", "secondary-2"}
+
+ for _, tc := range []struct {
+ desc string
+ virtualStorage string
+ healthyStorages []string
+ assignedStorages []string
+ storageGenerations map[string]int
+ expectedStorages []string
+ expectedErr error
+ }{
+ {
+ desc: "unknown virtual storage",
+ virtualStorage: "unknown",
+ expectedErr: nodes.ErrVirtualStorageNotExist,
+ },
+ {
+ desc: "all nodes unhealthy",
+ virtualStorage: virtualStorage,
+ storageGenerations: map[string]int{
+ "primary": 1,
+ "secondary-1": 1,
+ "secondary-2": 1,
+ },
+ expectedErr: ErrNoHealthyNodes,
+ },
+ {
+ desc: "unhealthy primary",
+ virtualStorage: virtualStorage,
+ healthyStorages: []string{"secondary-1", "secondary-2"},
+ storageGenerations: map[string]int{
+ "primary": 1,
+ "secondary-1": 1,
+ "secondary-2": 1,
+ },
+ expectedStorages: []string{"secondary-1", "secondary-2"},
+ },
+ {
+ desc: "unhealthy secondaries",
+ virtualStorage: virtualStorage,
+ healthyStorages: []string{"primary"},
+ storageGenerations: map[string]int{
+ "primary": 1,
+ "secondary-1": 1,
+ "secondary-2": 1,
+ },
+ expectedStorages: []string{"primary"},
+ },
+ {
+ desc: "all nodes healthy",
+ virtualStorage: virtualStorage,
+ healthyStorages: configuredStorages,
+ storageGenerations: map[string]int{
+ "primary": 1,
+ "secondary-1": 1,
+ "secondary-2": 1,
+ },
+ expectedStorages: []string{"primary", "secondary-1", "secondary-2"},
+ },
+ {
+ desc: "unassigned primary is ignored",
+ virtualStorage: virtualStorage,
+ healthyStorages: configuredStorages,
+ assignedStorages: []string{"secondary-1", "secondary-2"},
+ storageGenerations: map[string]int{
+ "primary": 1,
+ "secondary-1": 1,
+ "secondary-2": 1,
+ },
+ expectedStorages: []string{"secondary-1", "secondary-2"},
+ },
+ {
+ desc: "unassigned secondary is ignored",
+ virtualStorage: virtualStorage,
+ healthyStorages: configuredStorages,
+ assignedStorages: []string{"primary", "secondary-1"},
+ storageGenerations: map[string]int{
+ "primary": 1,
+ "secondary-1": 1,
+ "secondary-2": 1,
+ },
+ expectedStorages: []string{"primary", "secondary-1"},
+ },
+ {
+ desc: "no assigned nodes",
+ virtualStorage: virtualStorage,
+ healthyStorages: configuredStorages,
+ storageGenerations: map[string]int{
+ "primary": 1,
+ "secondary-1": 1,
+ "secondary-2": 1,
+ },
+ expectedStorages: []string{"primary", "secondary-1", "secondary-2"},
+ },
+ {
+ desc: "unhealthy and unassigned nodes",
+ virtualStorage: virtualStorage,
+ healthyStorages: []string{"primary", "secondary-1"},
+ assignedStorages: []string{"primary", "secondary-2"},
+ storageGenerations: map[string]int{
+ "primary": 1,
+ "secondary-1": 1,
+ "secondary-2": 1,
+ },
+ expectedStorages: []string{"primary"},
+ },
+ {
+ desc: "missing repo on primary",
+ virtualStorage: virtualStorage,
+ healthyStorages: configuredStorages,
+ assignedStorages: configuredStorages,
+ storageGenerations: map[string]int{
+ "primary": -1,
+ "secondary-1": 9000,
+ "secondary-2": 9000,
+ },
+ expectedStorages: []string{"secondary-1", "secondary-2"},
+ },
+ {
+ desc: "missing repo on secondary",
+ virtualStorage: virtualStorage,
+ healthyStorages: configuredStorages,
+ assignedStorages: configuredStorages,
+ storageGenerations: map[string]int{
+ "primary": 9000,
+ "secondary-1": 9000,
+ "secondary-2": -1,
+ },
+ expectedStorages: []string{"primary", "secondary-1"},
+ },
+ {
+ desc: "mixed generations",
+ virtualStorage: virtualStorage,
+ healthyStorages: configuredStorages,
+ assignedStorages: configuredStorages,
+ storageGenerations: map[string]int{
+ "primary": 0,
+ "secondary-1": 1,
+ "secondary-2": 2,
+ },
+ expectedStorages: []string{"primary", "secondary-1", "secondary-2"},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ tx := db.Begin(t)
+ defer tx.Rollback(t)
+
+ conns := Connections{
+ virtualStorage: {
+ "primary": &grpc.ClientConn{},
+ "secondary-1": &grpc.ClientConn{},
+ "secondary-2": &grpc.ClientConn{},
+ },
+ }
+
+ rs := datastore.NewPostgresRepositoryStore(tx, map[string][]string{
+ virtualStorage: configuredStorages,
+ })
+
+ repositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath)
+ require.NoError(t, err)
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, relativePath, "primary", []string{"secondary-1", "secondary-2"}, nil, true, false))
+
+ for _, storage := range tc.assignedStorages {
+ _, err := tx.ExecContext(ctx, `
+ INSERT INTO repository_assignments
+ VALUES ($1, $2, $3, $4)
+ `, virtualStorage, relativePath, storage, repositoryID)
+ require.NoError(t, err)
+ }
+
+ for storage, generation := range tc.storageGenerations {
+ require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, relativePath, generation))
+ }
+
+ router := NewPerRepositoryRouter(conns, nil, StaticHealthChecker{
+ virtualStorage: tc.healthyStorages,
+ }, nil, nil, nil, rs, nil)
+
+ route, err := router.RouteRepositoryMaintenance(ctx, tc.virtualStorage, relativePath)
+ require.Equal(t, tc.expectedErr, err)
+ if err == nil {
+ var expectedStorages []RouterNode
+ for _, expectedNode := range tc.expectedStorages {
+ expectedStorages = append(expectedStorages, RouterNode{
+ Storage: expectedNode,
+ Connection: conns[tc.virtualStorage][expectedNode],
+ })
+ }
+
+ require.Equal(t, RepositoryMaintenanceRoute{
+ RepositoryID: repositoryID,
+ ReplicaPath: relativePath,
+ Nodes: expectedStorages,
+ }, route)
+ }
+ })
+ }
+}
+
func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
t.Parallel()